From 44161857b1987dc709c2e50467be622412ac7fa3 Mon Sep 17 00:00:00 2001 From: wanghongenpin Date: Fri, 9 May 2025 22:19:04 +0800 Subject: [PATCH] http2 client (#388)(#51) --- lib/network/http/h2/h2_codec.dart | 119 ++++++++++++++++++------------ lib/network/http/h2/setting.dart | 2 +- lib/network/http/http_client.dart | 38 +++++++++- 3 files changed, 109 insertions(+), 50 deletions(-) diff --git a/lib/network/http/h2/h2_codec.dart b/lib/network/http/h2/h2_codec.dart index c70cc22..bccbe50 100644 --- a/lib/network/http/h2/h2_codec.dart +++ b/lib/network/http/h2/h2_codec.dart @@ -21,6 +21,7 @@ import 'package:proxypin/network/channel/channel_context.dart'; import 'package:proxypin/network/http/codec.dart'; import 'package:proxypin/network/http/h2/setting.dart'; import 'package:proxypin/network/http/http.dart'; +import 'package:proxypin/network/http/http_headers.dart'; import 'package:proxypin/network/util/byte_buf.dart'; import 'package:proxypin/network/util/logger.dart'; @@ -36,7 +37,7 @@ abstract class Http2Codec implements Codec { HPackDecoder decoder = HPackDecoder(); - HPackEncoder encoder = HPackEncoder(); + final HPackEncoder _hpackEncoder = HPackEncoder(); T createMessage(ChannelContext channelContext, FrameHeader frameHeader, Map> headers); @@ -51,7 +52,6 @@ abstract class Http2Codec implements Codec { byteBuf.get(byteBuf.readerIndex + 1) == 0x52 && byteBuf.get(byteBuf.readerIndex + 2) == 0x49 && isConnectionPrefacePRI(byteBuf)) { - result.forward = byteBuf.readBytes(connectionPrefacePRI.length); // logger.d( // "Connection Preface ${connectionPrefacePRI.length} ${String.fromCharCodes(result.forward!)} ${byteBuf.readableBytes()}"); @@ -60,11 +60,14 @@ abstract class Http2Codec implements Codec { } } + List? forward = result.forward == null ? null : List.of(result.forward!); + while (byteBuf.isReadable()) { FrameHeader? frameHeader = FrameReader.readFrameHeader(byteBuf); // logger.d( // "frameHeader streamId: ${frameHeader?.streamIdentifier} frame ${frameHeader?.type.name} ${frameHeader?.length} ${byteBuf.readableBytes()}"); if (frameHeader == null) { + result.forward = forward; result.isDone = false; return result; } @@ -73,23 +76,31 @@ abstract class Http2Codec implements Codec { if (framePayload == null) { result.isDone = false; byteBuf.readerIndex -= FrameReader.headerLength; + + result.forward = forward; return result; } var parseResult = parseHttp2Packet(channelContext, frameHeader, framePayload); - if (result.forward != null) { - parseResult.forward = List.of(result.forward!)..addAll(parseResult.forward ?? []); + if (parseResult.forward != null) { + forward ??= []; + forward.addAll(parseResult.forward!); } - return parseResult; + if (parseResult.isDone) { + parseResult.forward = forward; + return parseResult; + } } + result.forward = forward; result.isDone = false; return result; } DecoderResult parseHttp2Packet(ChannelContext channelContext, FrameHeader frameHeader, List framePayload) { var result = DecoderResult(isDone: false); + // logger.d( // "${this is Http2RequestDecoder ? 'request' : 'response'} streamId: ${frameHeader.streamIdentifier} ${frameHeader.type} endHeaders: ${frameHeader.hasEndHeadersFlag} " // "endStream: ${frameHeader.hasEndStreamFlag} ${frameHeader.length}"); @@ -116,6 +127,7 @@ abstract class Http2Codec implements Codec { channelContext.getStreamRequest(frameHeader.streamIdentifier)?.method == HttpMethod.head) { result.isDone = true; } + break; case FrameType.data: //处理DATA帧 @@ -126,12 +138,6 @@ abstract class Http2Codec implements Codec { SettingHandler.handleSettingsFrame(channelContext, frameHeader, ByteBuf(framePayload)); result.forward = List.from(frameHeader.encode())..addAll(framePayload); return result; - case FrameType.windowUpdate: - //处理WINDOW_UPDATE帧 - var windowSizeIncrement = readInt32(framePayload, 0) & 0x7fffffff; - logger.d("h2 windowUpdate streamId: ${frameHeader.streamIdentifier} windowSizeIncrement: $windowSizeIncrement"); - result.forward = List.from(frameHeader.encode())..addAll(framePayload); - return result; case FrameType.goaway: var lastStreamId = readInt32(framePayload, 0); var errorCode = readInt32(framePayload, 4); @@ -140,7 +146,6 @@ abstract class Http2Codec implements Codec { "h2 goaway streamId: ${frameHeader.streamIdentifier} lastStreamId: $lastStreamId errorCode: $errorCode debugData: ${String.fromCharCodes(debugData)}"); result.forward = List.from(frameHeader.encode())..addAll(framePayload); return result; - default: //其他帧类型 原文转发 result.forward = List.from(frameHeader.encode())..addAll(framePayload); @@ -165,38 +170,19 @@ abstract class Http2Codec implements Codec { @override Uint8List encode(T data) { var bytesBuilder = BytesBuilder(); - // data.headers.contentLength = data.body?.length ?? 0; + if (data.headers.getInt(HttpHeaders.CONTENT_LENGTH) != null) { + data.headers.set(HttpHeaders.CONTENT_LENGTH.toLowerCase(), "${data.body?.length ?? 0}"); + } + var emptyBody = data.body == null || data.body!.isEmpty; //headers var headers = encodeHeaders(data); - // BytesBuilder headerBlock = BytesBuilder(); - bool firstFrame = true; - var headerBlock = encoder.encode(headers); - // for (var header in headers) { - // //防止出现桢分片导致header分裂 - // if (headerBlock.length + encode.length < maxFrameSize) { - // headerBlock.add(encode); - // continue; - // } - // - // FrameType frameType = firstFrame ? FrameType.headers : FrameType.continuation; - // int flags = frameType == FrameType.headers && emptyBody ? FrameHeader.flagsEndStream : 0; - // firstFrame = false; - // - // _writeFrame(bytesBuilder, frameType, flags, data.streamId!, headerBlock.takeBytes()); - // headerBlock.add(encode); - // } - - FrameType frameType = firstFrame ? FrameType.headers : FrameType.continuation; - int flags = frameType == FrameType.headers && emptyBody ? FrameHeader.flagsEndStream : 0; - flags |= FrameHeader.flagsEndHeaders; - - _writeFrame(bytesBuilder, frameType, flags, data.streamId!, headerBlock); + writeHeadersFrame(bytesBuilder, data.streamId!, headers, endStream: emptyBody); //body - if (data.body?.isNotEmpty == true) { + if (!emptyBody) { var payload = data.body!; while (payload.length > maxFrameSize) { var chunkSize = min(maxFrameSize, payload.length); @@ -206,18 +192,52 @@ abstract class Http2Codec implements Codec { } _writeFrame(bytesBuilder, FrameType.data, FrameHeader.flagsEndStream, data.streamId!, payload); - } else if (frameType != FrameType.headers && emptyBody) { - //如果没有body,发送一个空的DATA帧 - _writeFrame(bytesBuilder, FrameType.data, FrameHeader.flagsEndStream, data.streamId!, []); } return bytesBuilder.takeBytes(); } + void writeHeadersFrame( + BytesBuilder bytesBuilder, + int streamId, + List
headers, { + StreamSetting? setting, + bool endStream = true, + }) { + var fragment = _hpackEncoder.encode(headers); + var maxSize = setting?.maxFrameSize ?? maxFrameSize; + + if (fragment.length < maxSize) { + int flags = FrameHeader.flagsEndHeaders; + if (endStream) { + flags |= FrameHeader.flagsEndStream; + } + _writeFrame(bytesBuilder, FrameType.headers, flags, streamId, fragment); + } else { + var chunk = fragment.sublist(0, maxSize); + fragment = fragment.sublist(maxSize); + + _writeFrame(bytesBuilder, FrameType.headers, 0, streamId, chunk); + + while (fragment.length > maxSize) { + var chunk = fragment.sublist(0, maxSize); + fragment = fragment.sublist(maxSize); + _writeFrame(bytesBuilder, FrameType.continuation, 0, streamId, chunk); + } + + _writeFrame(bytesBuilder, FrameType.continuation, FrameHeader.flagsEndHeaders, streamId, fragment); + + if (endStream) { + //如果没有body,发送一个空的DATA帧 + _writeFrame(bytesBuilder, FrameType.data, FrameHeader.flagsEndStream, streamId, []); + } + } + } + void _writeFrame(BytesBuilder bytesBuilder, FrameType type, int flag, int streamId, List payload) { FrameHeader frameHeader = FrameHeader(payload.length, type, flag, streamId); - logger.d( - "${this is Http2RequestDecoder ? 'request' : 'response'} _writeFrame streamId: ${frameHeader.streamIdentifier} ${frameHeader.type} flags:${frameHeader.flags} endHeaders: ${frameHeader.hasEndHeadersFlag} endStream: ${frameHeader.hasEndStreamFlag} ${payload.length}"); + // logger.d( + // "${this is Http2RequestDecoder ? 'request' : 'response'} _writeFrame streamId: ${frameHeader.streamIdentifier} ${frameHeader.type} flags:${frameHeader.flags} endHeaders: ${frameHeader.hasEndHeadersFlag} endStream: ${frameHeader.hasEndStreamFlag} ${payload.length}"); bytesBuilder.add(frameHeader.encode()); bytesBuilder.add(payload); @@ -269,11 +289,18 @@ abstract class Http2Codec implements Codec { int? weight; //如果帧头部有PRIORITY标志位,则需要读取优先级信息 if (frameHeader.hasPriorityFlag) { - //读取优先级信息 + if (payload.readableBytes() < 5) { + throw Exception("Invalid PRIORITY frame: insufficient data"); + } + + // 读取依赖流 ID 和权重 int dependency = payload.readInt(); - exclusiveDependency = (dependency & 0x80000000) == 0x80000000; - streamDependency = dependency & 0x7fffffff; - weight = payload.readByte(); // weight + exclusiveDependency = (dependency & 0x80000000) != 0; // 检查最高位是否为 1 + streamDependency = dependency & 0x7FFFFFFF; // 获取低 31 位 + weight = payload.readByte(); // 读取权重 + + logger.d( + "PRIORITY frame parsed: exclusive=$exclusiveDependency, streamDependency=$streamDependency, weight=$weight"); } var headerBlockLength = payload.length - payload.readerIndex - padLength; diff --git a/lib/network/http/h2/setting.dart b/lib/network/http/h2/setting.dart index c8c8f26..8646f5f 100644 --- a/lib/network/http/h2/setting.dart +++ b/lib/network/http/h2/setting.dart @@ -58,7 +58,7 @@ class SettingHandler { while (payload.isReadable()) { int identifier = payload.readShort(); int value = payload.readInt(); - // logger.d("SettingHandler.handleSettingsFrame identifier=$identifier value=$value"); + // print("SettingHandler.handleSettingsFrame identifier=$identifier value=$value"); // Handle the setting based on its identifier switch (identifier) { diff --git a/lib/network/http/http_client.dart b/lib/network/http/http_client.dart index 8b28c9d..3a8048a 100644 --- a/lib/network/http/http_client.dart +++ b/lib/network/http/http_client.dart @@ -26,6 +26,7 @@ import 'package:proxypin/network/http/http.dart'; import 'package:proxypin/network/http/http_headers.dart'; import 'package:proxypin/network/channel/network.dart'; import 'package:proxypin/network/util/byte_buf.dart'; +import 'package:proxypin/network/util/byte_utils.dart'; import 'package:proxypin/network/util/logger.dart'; import 'package:proxypin/network/util/system_proxy.dart'; import 'package:proxy_manager/proxy_manager.dart'; @@ -33,6 +34,7 @@ import 'package:proxy_manager/proxy_manager.dart'; import '../channel/channel.dart'; import 'codec.dart'; import 'h2/frame.dart'; +import 'h2/setting.dart'; class HttpClients { static Future startConnect(HostAndPort hostAndPort, {Duration timeout = const Duration(seconds: 3)}) { @@ -70,11 +72,11 @@ class HttpClients { if (hostAndPort.isSsl()) { await channel.startSecureSocket(channelContext, - host: hostAndPort.host, supportedProtocols: request.protocolVersion == "HTTP/2" ? ["h2"] : null); - + host: hostAndPort.host, supportedProtocols: request.protocolVersion == "HTTP/2" ? ["h2", "http/1.1"] : null); if (channelContext.serverChannel?.selectedProtocol == "h2") { await Http2ClientHandler(handler).listen(channel, channelContext); } else { + request.protocolVersion = "HTTP/1.1"; channel.dispatcher.listen(channel, channelContext); } } @@ -195,7 +197,37 @@ class Http2ClientHandler { onError: (error, trace) => handler.exceptionCaught(channelContext, channel, error, trace: trace), onDone: () => handler.channelInactive(channelContext, channel)); - channel.writeBytes(Http2Codec.connectionPrefacePRI); + await channel.writeBytes(Http2Codec.connectionPrefacePRI); + + //发送setting + final streamSetting = StreamSetting(); + streamSetting.headTableSize = 65536; + streamSetting.initialWindowSize = 1048896; + streamSetting.maxHeaderListSize = 262144; + + var payload = Uint8List(6 * 3); + int offset = 0; + // SETTINGS_HEADER_TABLE_SIZE + setInt16(payload, offset, 1); + offset += 2; + setInt32(payload, offset, streamSetting.headTableSize); + offset += 4; + + // SETTINGS_INITIAL_WINDOW_SIZE + setInt16(payload, offset, 4); + offset += 2; + setInt32(payload, offset, streamSetting.initialWindowSize); + offset += 4; + + //SETTINGS_MAX_FRAME_SIZE + setInt16(payload, offset, 6); + offset += 2; + setInt32(payload, offset, streamSetting.maxHeaderListSize!); + offset += 4; + + var settingFrame = FrameHeader(payload.length, FrameType.settings, 0, 0); + var buffer = settingFrame.encode()..addAll(payload); + await channel.writeBytes(buffer); } onData(ChannelContext channelContext, Channel channel, Uint8List data) {