request repeat support http2 (#388)(#51)

This commit is contained in:
wanghongenpin
2025-05-08 21:46:28 +08:00
parent 9048cb1324
commit 94197e9817
9 changed files with 152 additions and 33 deletions

View File

@@ -87,12 +87,23 @@ class Channel {
return secureSocket;
}
///服务端ssl握手
serverSecureSocket(SecureSocket secureSocket, ChannelContext channelContext) {
_socket = secureSocket;
_socket.done.then((value) => isOpen = false);
dispatcher.listen(this, channelContext);
}
Future<SecureSocket> startSecureSocket(ChannelContext channelContext,
{String? host, List<String>? supportedProtocols}) async {
SecureSocket secureSocket = await SecureSocket.secure(socket,
host: host, supportedProtocols: supportedProtocols, onBadCertificate: (certificate) => true);
_socket = secureSocket;
_socket.done.then((value) => isOpen = false);
return secureSocket;
}
String? get selectedProtocol => isSsl ? (_socket as SecureSocket).selectedProtocol : null;
///是否是ssl链接

View File

@@ -1,14 +1,15 @@
import 'package:proxypin/network/channel/channel.dart';
import 'package:proxypin/network/channel/host_port.dart';
import 'package:proxypin/network/http/codec.dart';
import 'package:proxypin/network/http/h2/setting.dart';
import 'package:proxypin/network/http/http.dart';
import 'package:proxypin/network/http/http_client.dart';
import 'package:proxypin/network/util/attribute_keys.dart';
import 'package:proxypin/network/util/process_info.dart';
import 'package:proxypin/utils/lang.dart';
import '../bin/listener.dart';
import 'network.dart';
///
class ChannelContext {
@@ -23,18 +24,26 @@ class ChannelContext {
EventListener? listener;
//http2 stream
final Map<int, Pair<HttpRequest, ValueWrap<HttpResponse>>> _streams = {};
final Map<int, Pair<HttpRequest?, HttpResponse?>> _streams = {};
ChannelContext();
//创建服务端连接
Future<Channel> connectServerChannel(HostAndPort hostAndPort, ChannelHandler channelHandler) async {
serverChannel = await HttpClients.startConnect(hostAndPort, channelHandler, this);
serverChannel = await startConnect(hostAndPort, channelHandler, this);
putAttribute(clientChannel!.id, serverChannel);
putAttribute(serverChannel!.id, clientChannel);
return serverChannel!;
}
/// 建立连接
static Future<Channel> startConnect(
HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext) async {
var client = Client()..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler));
return client.connect(hostAndPort, channelContext);
}
T? getAttribute<T>(String key) {
if (!_attributes.containsKey(key)) {
return null;
@@ -66,15 +75,20 @@ class ChannelContext {
HttpRequest? putStreamRequest(int streamId, HttpRequest request) {
var old = _streams[streamId]?.key;
_streams[streamId] = Pair(request, ValueWrap());
_streams[streamId] = Pair(request, null);
return old;
}
void putStreamResponse(int streamId, HttpResponse response) {
var stream = _streams[streamId]!;
stream.key.response = response;
response.request = stream.key;
stream.value.set(response);
var pair = _streams[streamId];
if (pair == null) {
pair = Pair(null, response);
_streams[streamId] = pair;
}
pair.key?.response = response;
response.request = pair.key;
pair.value = response;
}
HttpRequest? getStreamRequest(int streamId) {
@@ -82,7 +96,7 @@ class ChannelContext {
}
HttpResponse? getStreamResponse(int streamId) {
return _streams[streamId]?.value.get();
return _streams[streamId]?.value;
}
void removeStream(int streamId) {

View File

@@ -19,10 +19,10 @@ import 'dart:typed_data';
import 'package:proxypin/network/channel/channel_context.dart';
import 'package:proxypin/network/channel/host_port.dart';
import 'package:proxypin/network/http/body_reader.dart';
import 'package:proxypin/network/http/parse/body_reader.dart';
import 'package:proxypin/network/http/constants.dart';
import 'package:proxypin/network/http/h2/h2_codec.dart';
import 'package:proxypin/network/http/http_parser.dart';
import 'package:proxypin/network/http/parse/http_parser.dart';
import 'package:proxypin/network/util/byte_buf.dart';
import 'http.dart';
@@ -155,7 +155,7 @@ abstract class HttpCodec<T extends HttpMessage> implements Codec<T, T> {
@override
List<int> encode(T message) {
if (message.streamId != null) {
if (message.protocolVersion == "HTTP/2") {
return getH2Codec().encode(message);
}

View File

@@ -24,6 +24,7 @@ import 'package:proxypin/network/http/http.dart';
import 'package:proxypin/network/util/byte_buf.dart';
import 'package:proxypin/network/util/logger.dart';
import '../../util/byte_utils.dart';
import 'frame.dart';
import 'hpack/hpack.dart';
@@ -55,7 +56,7 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
while (byteBuf.isReadable()) {
DecoderResult<T> result = DecoderResult<T>(isDone: false);
FrameHeader? frameHeader = FrameReader._readFrameHeader(byteBuf);
FrameHeader? frameHeader = FrameReader.readFrameHeader(byteBuf);
// logger.d("${frameHeader?.streamIdentifier} frame ${frameHeader?.length} ${byteBuf.readableBytes()}");
if (frameHeader == null) {
@@ -115,6 +116,15 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
SettingHandler.handleSettingsFrame(channelContext, frameHeader, ByteBuf(framePayload));
result.forward = List.from(frameHeader.encode())..addAll(framePayload);
return result;
case FrameType.goaway:
var lastStreamId = readInt32(framePayload, 0);
var errorCode = readInt32(framePayload, 4);
var debugData = viewOrSublist(framePayload, 8, frameHeader.length - 8);
logger.i(
"h2 goaway streamId: ${frameHeader.streamIdentifier} lastStreamId: $lastStreamId errorCode: $errorCode debugData: ${String.fromCharCodes(debugData)}");
result.forward = List.from(frameHeader.encode())..addAll(framePayload);
return result;
default:
//其他帧类型 原文转发
result.forward = List.from(frameHeader.encode())..addAll(framePayload);
@@ -318,7 +328,7 @@ class Http2RequestDecoder extends Http2Codec<HttpRequest> {
message.headers.forEach((key, values) {
for (var value in values) {
headers.add(Header.ascii(key, value));
headers.add(Header.ascii(key.toLowerCase(), value));
}
});
return headers;
@@ -331,7 +341,10 @@ class Http2ResponseDecoder extends Http2Codec<HttpResponse> {
ChannelContext channelContext, FrameHeader frameHeader, Map<String, List<String>> headers) {
var httpResponse = HttpResponse(HttpStatus.valueOf(int.parse(headers[':status']!.first)),
protocolVersion: headers[":version"]?.firstOrNull ?? 'HTTP/2');
httpResponse.requestId = channelContext.getStreamRequest(frameHeader.streamIdentifier)!.requestId;
final requestId = channelContext.getStreamRequest(frameHeader.streamIdentifier)?.requestId;
if (requestId != null) {
httpResponse.requestId = requestId;
}
channelContext.putStreamResponse(frameHeader.streamIdentifier, httpResponse);
return httpResponse;
}
@@ -367,7 +380,7 @@ class FrameReader {
return readBytes;
}
static FrameHeader? _readFrameHeader(ByteBuf data) {
static FrameHeader? readFrameHeader(ByteBuf data) {
if (data.readableBytes() < headerLength) {
return null;
}

View File

@@ -234,6 +234,8 @@ class HttpRequest extends HttpMessage {
if (uri != null && !uri.startsWith('/')) {
request.hostAndPort = HostAndPort.of(uri);
}
request.hostAndPort ??= hostAndPort;
request.streamId = streamId;
request.body = body;
return request;
}

View File

@@ -16,33 +16,45 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:proxypin/network/channel/channel_context.dart';
import 'package:proxypin/network/channel/host_port.dart';
import 'package:proxypin/network/http/h2/h2_codec.dart';
import 'package:proxypin/network/http/http.dart';
import 'package:proxypin/network/http/http_headers.dart';
import 'package:proxypin/network/channel/network.dart';
import 'package:proxypin/network/util/byte_buf.dart';
import 'package:proxypin/network/util/logger.dart';
import 'package:proxypin/network/util/system_proxy.dart';
import 'package:proxy_manager/proxy_manager.dart';
import '../channel/channel.dart';
import 'codec.dart';
import 'h2/frame.dart';
class HttpClients {
/// 建立连接
static Future<Channel> startConnect(
HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext) async {
var client = Client()
..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler));
static Future<Channel> startConnect(HostAndPort hostAndPort, {Duration timeout = const Duration(seconds: 3)}) {
String host = hostAndPort.host;
//说明支持ipv6
if (host.startsWith("[") && host.endsWith(']')) {
host = host.substring(1, host.length - 1);
}
return client.connect(hostAndPort, channelContext);
return Socket.connect(host, hostAndPort.port, timeout: timeout).then((socket) {
if (socket.address.type != InternetAddressType.unix) {
socket.setOption(SocketOption.tcpNoDelay, true);
}
return Channel(socket);
});
}
///代理建立连接
static Future<Channel> proxyConnect(HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext,
static Future<Channel> proxyConnect(
HttpRequest request, HostAndPort hostAndPort, ChannelHandler<HttpResponse> handler, ChannelContext channelContext,
{ProxyInfo? proxyInfo}) async {
var client = Client()
..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler));
var client = Client()..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler));
if (proxyInfo == null) {
var proxyTypes = hostAndPort.isSsl() ? ProxyTypes.https : ProxyTypes.http;
@@ -57,9 +69,19 @@ class HttpClients {
}
if (hostAndPort.isSsl()) {
await channel.secureSocket(channelContext, host: hostAndPort.host);
await channel.startSecureSocket(channelContext,
host: hostAndPort.host, supportedProtocols: request.protocolVersion == "HTTP/2" ? ["h2"] : null);
if (channelContext.serverChannel?.selectedProtocol == "h2") {
await Http2ClientHandler(handler).listen(channel, channelContext);
} else {
channel.dispatcher.listen(channel, channelContext);
}
}
logger.d(
"request ${hostAndPort.host}:${hostAndPort.port} ${request.protocolVersion} ${channelContext.serverChannel?.selectedProtocol}");
return channel;
}
@@ -115,7 +137,8 @@ class HttpClients {
var httpResponseHandler = HttpResponseHandler();
var client = Client()
..initChannel((channel) => channel.dispatcher.handle(HttpResponseCodec(), HttpRequestCodec(), httpResponseHandler));
..initChannel(
(channel) => channel.dispatcher.handle(HttpResponseCodec(), HttpRequestCodec(), httpResponseHandler));
ChannelContext channelContext = ChannelContext();
Channel channel = await client.connect(hostAndPort, channelContext);
@@ -133,24 +156,79 @@ class HttpClients {
request.headers.host = '${uri.host}${uri.hasPort ? ':${uri.port}' : ''}';
} catch (_) {}
}
request.protocolVersion = 'HTTP/1.1';
ChannelContext channelContext = ChannelContext();
var httpResponseHandler = HttpResponseHandler();
request.hostAndPort ??= HostAndPort.of(request.requestUrl);
Channel channel =
await proxyConnect(proxyInfo: proxyInfo, request.hostAndPort!, httpResponseHandler, channelContext);
await proxyConnect(request, proxyInfo: proxyInfo, request.hostAndPort!, httpResponseHandler, channelContext);
if (!request.uri.startsWith("/")) {
Uri? uri = request.requestUri;
request = request.copy(uri: '${uri!.path}${uri.hasQuery ? '?${uri.query}' : ''}');
}
if (channel.selectedProtocol == 'h2') {
request.headers.remove(HttpHeaders.HOST);
request.streamId = 1;
}
await channel.write(request);
return httpResponseHandler.getResponse(timeout).whenComplete(() => channel.close());
}
}
class Http2ClientHandler {
static const int FLAG_ACK = 0x1;
ByteBuf byteBuf = ByteBuf();
Http2ResponseDecoder decoder = Http2ResponseDecoder();
final ChannelHandler<HttpResponse> handler;
Http2ClientHandler(this.handler);
Future<void> listen(Channel channel, ChannelContext channelContext) async {
channel.dispatcher.encoder = Http2RequestDecoder();
channel.dispatcher.decoder = decoder;
channel.socket.listen((data) => onData(channelContext, channel, data),
onError: (error, trace) => handler.exceptionCaught(channelContext, channel, error, trace: trace),
onDone: () => handler.channelInactive(channelContext, channel));
channel.writeBytes("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".codeUnits);
}
onData(ChannelContext channelContext, Channel channel, Uint8List data) {
byteBuf.add(data);
var decodeResult = decoder.decode(channelContext, byteBuf);
if (!decodeResult.isDone) {
return;
}
byteBuf.clearRead();
if (decodeResult.forward != null) {
ByteBuf buffer = ByteBuf(decodeResult.forward);
FrameHeader? frameHeader = FrameReader.readFrameHeader(buffer);
logger.d("Http2ClientHandler forward ${frameHeader?.type}");
if (frameHeader?.type == FrameType.settings) {
// 检查是否需要发送 ACK
if (frameHeader!.hasAckFlag == false) {
// 发送带有 ACK 标志的 SETTINGS 帧
var ackFrame = FrameHeader(0, FrameType.settings, FLAG_ACK, 0);
channel.writeBytes(ackFrame.encode());
}
}
return;
}
handler.channelRead(channelContext, channel, decodeResult.data!);
}
}
class HttpResponseHandler extends ChannelHandler<HttpResponse> {
Completer<HttpResponse> _completer = Completer<HttpResponse>();

View File

@@ -20,8 +20,8 @@ import 'dart:typed_data';
import 'package:proxypin/network/http/constants.dart';
import 'package:proxypin/network/http/http.dart';
import '../../utils/num.dart';
import 'codec.dart';
import '../../../utils/num.dart';
import '../codec.dart';
class Result {
final bool isDone;

View File

@@ -119,6 +119,7 @@ extension StringEnhance on String {
return this;
}
}
String fixAutoLines() {
return Characters(this).join('\u{200B}');
}
@@ -142,8 +143,8 @@ extension StringEnhance on String {
}
class Pair<K, V> {
final K key;
final V value;
final K? key;
V? value;
Pair(this.key, this.value);
}