Add http sharding

This commit is contained in:
Lightczx
2023-12-04 16:09:28 +08:00
parent 70cb4b8285
commit 1c67da607c
3 changed files with 151 additions and 50 deletions

View File

@@ -0,0 +1,124 @@
// Copyright (c) DGP Studio. All rights reserved.
// Licensed under the MIT license.
using Microsoft.Win32.SafeHandles;
using Snap.Hutao.Web.Request.Builder;
using System.IO;
using System.Net.Http;
namespace Snap.Hutao.Core.IO.Http.Sharding;
internal sealed class HttpShardCopyWorker : IDisposable
{
private const int ShardSize = 4 * 1024 * 1024;
private readonly HttpClient httpClient;
private readonly string sourceUrl;
private readonly int bufferSize;
private readonly SafeFileHandle destFileHandle;
private readonly List<Shard> shards;
private HttpShardCopyWorker(HttpClient httpClient, string sourceUrl, string destFilePath, long contentLength, int bufferSize)
{
this.httpClient = httpClient;
this.sourceUrl = sourceUrl;
this.bufferSize = bufferSize;
destFileHandle = File.OpenHandle(
destFilePath,
FileMode.Create,
FileAccess.Write,
FileShare.None,
FileOptions.RandomAccess | FileOptions.Asynchronous,
contentLength);
shards = CalculateShards(contentLength);
}
public static async ValueTask<HttpShardCopyWorker> CreateAsync(HttpClient httpClient, string sourceUrl, string destFilePath, int bufferSize = 81920)
{
HttpResponseMessage response = await httpClient.HeadAsync(sourceUrl, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
long contentLength = response.Content.Headers.ContentLength ?? 0;
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(contentLength);
return new(httpClient, sourceUrl, destFilePath, contentLength, bufferSize);
}
[SuppressMessage("", "SH003")]
public Task CopyAsync(CancellationToken cancellationToken = default)
{
return Parallel.ForEachAsync(shards, cancellationToken, CopyShardAsync);
}
public void Dispose()
{
destFileHandle.Dispose();
}
private static List<Shard> CalculateShards(long contentLength)
{
List<Shard> shards = [];
long currentOffset = 0;
while (currentOffset < contentLength)
{
long end = Math.Min(currentOffset + ShardSize, contentLength) - 1;
shards.Add(new Shard(currentOffset, end));
currentOffset = end + 1;
}
return shards;
}
private async ValueTask CopyShardAsync(Shard shard, CancellationToken token)
{
HttpRequestMessage request = new(HttpMethod.Get, sourceUrl)
{
Headers =
{
Range = new(shard.StartOffset, shard.EndOffset),
},
};
using (request)
{
using (HttpResponseMessage response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token).ConfigureAwait(false))
{
response.EnsureSuccessStatusCode();
Memory<byte> buffer = new byte[bufferSize];
using (Stream stream = await response.Content.ReadAsStreamAsync(token).ConfigureAwait(false))
{
int streamOffset = 0;
do
{
int bytesRead = await stream.ReadAsync(buffer, token).ConfigureAwait(false);
if (bytesRead <= 0)
{
break;
}
await RandomAccess.WriteAsync(destFileHandle, buffer[..bytesRead], shard.StartOffset + streamOffset, token).ConfigureAwait(false);
streamOffset += bytesRead;
}
while (true);
}
}
}
}
private sealed class Shard
{
public Shard(long startOffset, long endOffset)
{
StartOffset = startOffset;
EndOffset = endOffset;
}
public long StartOffset { get; }
public long EndOffset { get; }
}
}

View File

@@ -5,63 +5,16 @@ using System.IO;
namespace Snap.Hutao.Core.IO;
/// <summary>
/// 流复制器
/// </summary>
internal sealed class StreamCopyWorker
internal sealed class StreamCopyWorker : StreamCopyWorker<StreamCopyStatus>
{
private readonly Stream source;
private readonly Stream destination;
private readonly long totalBytes;
private readonly int bufferSize;
/// <summary>
/// 创建一个新的流复制器
/// </summary>
/// <param name="source">源</param>
/// <param name="destination">目标</param>
/// <param name="totalBytes">总字节</param>
/// <param name="bufferSize">字节尺寸</param>
public StreamCopyWorker(Stream source, Stream destination, long totalBytes, int bufferSize = 81920)
: base(source, destination, totalBytes => new StreamCopyStatus(totalBytes, totalBytes), bufferSize)
{
Verify.Operation(source.CanRead, "Source Stream can't read");
Verify.Operation(destination.CanWrite, "Destination Stream can't write");
this.source = source;
this.destination = destination;
this.totalBytes = totalBytes;
this.bufferSize = bufferSize;
}
/// <summary>
/// 异步复制
/// </summary>
/// <param name="progress">进度</param>
/// <returns>任务</returns>
public async ValueTask CopyAsync(IProgress<StreamCopyStatus> progress)
{
long totalBytesRead = 0;
int bytesRead;
Memory<byte> buffer = new byte[bufferSize];
do
{
bytesRead = await source.ReadAsync(buffer).ConfigureAwait(false);
await destination.WriteAsync(buffer[..bytesRead]).ConfigureAwait(false);
totalBytesRead += bytesRead;
progress.Report(new(totalBytesRead, totalBytes));
}
while (bytesRead > 0);
}
}
/// <summary>
/// 针对特定类型的流复制器
/// </summary>
/// <typeparam name="TStatus">进度类型</typeparam>
[SuppressMessage("", "SA1402")]
internal sealed class StreamCopyWorker<TStatus>
internal class StreamCopyWorker<TStatus>
{
private readonly Stream source;
private readonly Stream destination;

View File

@@ -0,0 +1,24 @@
// Copyright (c) DGP Studio. All rights reserved.
// Licensed under the MIT license.
using System.Net.Http;
using System.Runtime.CompilerServices;
namespace Snap.Hutao.Web.Request.Builder;
internal static class HttpClientExtension
{
[SuppressMessage("", "SH003")]
public static Task<HttpResponseMessage> HeadAsync(this HttpClient httpClient, [StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri, HttpCompletionOption completionOption)
{
return httpClient.SendAsync(PrivateCreateRequestMessage(httpClient, HttpMethod.Get, CreateUri(default!, requestUri)), completionOption, CancellationToken.None);
}
// private HttpRequestMessage CreateRequestMessage(HttpMethod method, Uri? uri)
[UnsafeAccessor(UnsafeAccessorKind.Method, Name = "CreateRequestMessage")]
private static extern HttpRequestMessage PrivateCreateRequestMessage(HttpClient httpClient, HttpMethod method, Uri? uri);
// private static Uri? CreateUri(string? uri)
[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "CreateUri")]
private static extern Uri? CreateUri(HttpClient discard, string? uri);
}