diff --git a/lib/network/channel/channel.dart b/lib/network/channel/channel.dart index efca2a9..a1777f9 100644 --- a/lib/network/channel/channel.dart +++ b/lib/network/channel/channel.dart @@ -87,12 +87,23 @@ class Channel { return secureSocket; } + ///服务端ssl握手 serverSecureSocket(SecureSocket secureSocket, ChannelContext channelContext) { _socket = secureSocket; _socket.done.then((value) => isOpen = false); dispatcher.listen(this, channelContext); } + Future startSecureSocket(ChannelContext channelContext, + {String? host, List? supportedProtocols}) async { + SecureSocket secureSocket = await SecureSocket.secure(socket, + host: host, supportedProtocols: supportedProtocols, onBadCertificate: (certificate) => true); + + _socket = secureSocket; + _socket.done.then((value) => isOpen = false); + return secureSocket; + } + String? get selectedProtocol => isSsl ? (_socket as SecureSocket).selectedProtocol : null; ///是否是ssl链接 diff --git a/lib/network/channel/channel_context.dart b/lib/network/channel/channel_context.dart index dedcf4f..1683966 100644 --- a/lib/network/channel/channel_context.dart +++ b/lib/network/channel/channel_context.dart @@ -1,14 +1,15 @@ import 'package:proxypin/network/channel/channel.dart'; import 'package:proxypin/network/channel/host_port.dart'; +import 'package:proxypin/network/http/codec.dart'; import 'package:proxypin/network/http/h2/setting.dart'; import 'package:proxypin/network/http/http.dart'; -import 'package:proxypin/network/http/http_client.dart'; import 'package:proxypin/network/util/attribute_keys.dart'; import 'package:proxypin/network/util/process_info.dart'; import 'package:proxypin/utils/lang.dart'; import '../bin/listener.dart'; +import 'network.dart'; /// class ChannelContext { @@ -23,18 +24,26 @@ class ChannelContext { EventListener? listener; //http2 stream - final Map>> _streams = {}; + final Map> _streams = {}; ChannelContext(); //创建服务端连接 Future connectServerChannel(HostAndPort hostAndPort, ChannelHandler channelHandler) async { - serverChannel = await HttpClients.startConnect(hostAndPort, channelHandler, this); + serverChannel = await startConnect(hostAndPort, channelHandler, this); putAttribute(clientChannel!.id, serverChannel); putAttribute(serverChannel!.id, clientChannel); return serverChannel!; } + /// 建立连接 + static Future startConnect( + HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext) async { + var client = Client()..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler)); + + return client.connect(hostAndPort, channelContext); + } + T? getAttribute(String key) { if (!_attributes.containsKey(key)) { return null; @@ -66,15 +75,20 @@ class ChannelContext { HttpRequest? putStreamRequest(int streamId, HttpRequest request) { var old = _streams[streamId]?.key; - _streams[streamId] = Pair(request, ValueWrap()); + _streams[streamId] = Pair(request, null); return old; } void putStreamResponse(int streamId, HttpResponse response) { - var stream = _streams[streamId]!; - stream.key.response = response; - response.request = stream.key; - stream.value.set(response); + var pair = _streams[streamId]; + if (pair == null) { + pair = Pair(null, response); + _streams[streamId] = pair; + } + + pair.key?.response = response; + response.request = pair.key; + pair.value = response; } HttpRequest? getStreamRequest(int streamId) { @@ -82,7 +96,7 @@ class ChannelContext { } HttpResponse? getStreamResponse(int streamId) { - return _streams[streamId]?.value.get(); + return _streams[streamId]?.value; } void removeStream(int streamId) { diff --git a/lib/network/http/codec.dart b/lib/network/http/codec.dart index ba1f13d..c8fd375 100644 --- a/lib/network/http/codec.dart +++ b/lib/network/http/codec.dart @@ -19,10 +19,10 @@ import 'dart:typed_data'; import 'package:proxypin/network/channel/channel_context.dart'; import 'package:proxypin/network/channel/host_port.dart'; -import 'package:proxypin/network/http/body_reader.dart'; +import 'package:proxypin/network/http/parse/body_reader.dart'; import 'package:proxypin/network/http/constants.dart'; import 'package:proxypin/network/http/h2/h2_codec.dart'; -import 'package:proxypin/network/http/http_parser.dart'; +import 'package:proxypin/network/http/parse/http_parser.dart'; import 'package:proxypin/network/util/byte_buf.dart'; import 'http.dart'; @@ -155,7 +155,7 @@ abstract class HttpCodec implements Codec { @override List encode(T message) { - if (message.streamId != null) { + if (message.protocolVersion == "HTTP/2") { return getH2Codec().encode(message); } diff --git a/lib/network/http/h2/h2_codec.dart b/lib/network/http/h2/h2_codec.dart index 493a2d2..5722bca 100644 --- a/lib/network/http/h2/h2_codec.dart +++ b/lib/network/http/h2/h2_codec.dart @@ -24,6 +24,7 @@ import 'package:proxypin/network/http/http.dart'; import 'package:proxypin/network/util/byte_buf.dart'; import 'package:proxypin/network/util/logger.dart'; +import '../../util/byte_utils.dart'; import 'frame.dart'; import 'hpack/hpack.dart'; @@ -55,7 +56,7 @@ abstract class Http2Codec implements Codec { while (byteBuf.isReadable()) { DecoderResult result = DecoderResult(isDone: false); - FrameHeader? frameHeader = FrameReader._readFrameHeader(byteBuf); + FrameHeader? frameHeader = FrameReader.readFrameHeader(byteBuf); // logger.d("${frameHeader?.streamIdentifier} frame ${frameHeader?.length} ${byteBuf.readableBytes()}"); if (frameHeader == null) { @@ -115,6 +116,15 @@ abstract class Http2Codec implements Codec { SettingHandler.handleSettingsFrame(channelContext, frameHeader, ByteBuf(framePayload)); result.forward = List.from(frameHeader.encode())..addAll(framePayload); return result; + case FrameType.goaway: + var lastStreamId = readInt32(framePayload, 0); + var errorCode = readInt32(framePayload, 4); + var debugData = viewOrSublist(framePayload, 8, frameHeader.length - 8); + logger.i( + "h2 goaway streamId: ${frameHeader.streamIdentifier} lastStreamId: $lastStreamId errorCode: $errorCode debugData: ${String.fromCharCodes(debugData)}"); + result.forward = List.from(frameHeader.encode())..addAll(framePayload); + return result; + default: //其他帧类型 原文转发 result.forward = List.from(frameHeader.encode())..addAll(framePayload); @@ -318,7 +328,7 @@ class Http2RequestDecoder extends Http2Codec { message.headers.forEach((key, values) { for (var value in values) { - headers.add(Header.ascii(key, value)); + headers.add(Header.ascii(key.toLowerCase(), value)); } }); return headers; @@ -331,7 +341,10 @@ class Http2ResponseDecoder extends Http2Codec { ChannelContext channelContext, FrameHeader frameHeader, Map> headers) { var httpResponse = HttpResponse(HttpStatus.valueOf(int.parse(headers[':status']!.first)), protocolVersion: headers[":version"]?.firstOrNull ?? 'HTTP/2'); - httpResponse.requestId = channelContext.getStreamRequest(frameHeader.streamIdentifier)!.requestId; + final requestId = channelContext.getStreamRequest(frameHeader.streamIdentifier)?.requestId; + if (requestId != null) { + httpResponse.requestId = requestId; + } channelContext.putStreamResponse(frameHeader.streamIdentifier, httpResponse); return httpResponse; } @@ -367,7 +380,7 @@ class FrameReader { return readBytes; } - static FrameHeader? _readFrameHeader(ByteBuf data) { + static FrameHeader? readFrameHeader(ByteBuf data) { if (data.readableBytes() < headerLength) { return null; } diff --git a/lib/network/http/http.dart b/lib/network/http/http.dart index 7ccbeef..8c951a5 100644 --- a/lib/network/http/http.dart +++ b/lib/network/http/http.dart @@ -234,6 +234,8 @@ class HttpRequest extends HttpMessage { if (uri != null && !uri.startsWith('/')) { request.hostAndPort = HostAndPort.of(uri); } + request.hostAndPort ??= hostAndPort; + request.streamId = streamId; request.body = body; return request; } diff --git a/lib/network/http/http_client.dart b/lib/network/http/http_client.dart index 0134402..03d7aa1 100644 --- a/lib/network/http/http_client.dart +++ b/lib/network/http/http_client.dart @@ -16,33 +16,45 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; import 'package:proxypin/network/channel/channel_context.dart'; import 'package:proxypin/network/channel/host_port.dart'; +import 'package:proxypin/network/http/h2/h2_codec.dart'; import 'package:proxypin/network/http/http.dart'; import 'package:proxypin/network/http/http_headers.dart'; import 'package:proxypin/network/channel/network.dart'; +import 'package:proxypin/network/util/byte_buf.dart'; +import 'package:proxypin/network/util/logger.dart'; import 'package:proxypin/network/util/system_proxy.dart'; import 'package:proxy_manager/proxy_manager.dart'; import '../channel/channel.dart'; import 'codec.dart'; +import 'h2/frame.dart'; class HttpClients { - /// 建立连接 - static Future startConnect( - HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext) async { - var client = Client() - ..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler)); + static Future startConnect(HostAndPort hostAndPort, {Duration timeout = const Duration(seconds: 3)}) { + String host = hostAndPort.host; + //说明支持ipv6 + if (host.startsWith("[") && host.endsWith(']')) { + host = host.substring(1, host.length - 1); + } - return client.connect(hostAndPort, channelContext); + return Socket.connect(host, hostAndPort.port, timeout: timeout).then((socket) { + if (socket.address.type != InternetAddressType.unix) { + socket.setOption(SocketOption.tcpNoDelay, true); + } + return Channel(socket); + }); } ///代理建立连接 - static Future proxyConnect(HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext, + static Future proxyConnect( + HttpRequest request, HostAndPort hostAndPort, ChannelHandler handler, ChannelContext channelContext, {ProxyInfo? proxyInfo}) async { - var client = Client() - ..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler)); + var client = Client()..initChannel((channel) => channel.dispatcher.channelHandle(HttpClientCodec(), handler)); if (proxyInfo == null) { var proxyTypes = hostAndPort.isSsl() ? ProxyTypes.https : ProxyTypes.http; @@ -57,9 +69,19 @@ class HttpClients { } if (hostAndPort.isSsl()) { - await channel.secureSocket(channelContext, host: hostAndPort.host); + await channel.startSecureSocket(channelContext, + host: hostAndPort.host, supportedProtocols: request.protocolVersion == "HTTP/2" ? ["h2"] : null); + + if (channelContext.serverChannel?.selectedProtocol == "h2") { + await Http2ClientHandler(handler).listen(channel, channelContext); + } else { + channel.dispatcher.listen(channel, channelContext); + } } + logger.d( + "request ${hostAndPort.host}:${hostAndPort.port} ${request.protocolVersion} ${channelContext.serverChannel?.selectedProtocol}"); + return channel; } @@ -115,7 +137,8 @@ class HttpClients { var httpResponseHandler = HttpResponseHandler(); var client = Client() - ..initChannel((channel) => channel.dispatcher.handle(HttpResponseCodec(), HttpRequestCodec(), httpResponseHandler)); + ..initChannel( + (channel) => channel.dispatcher.handle(HttpResponseCodec(), HttpRequestCodec(), httpResponseHandler)); ChannelContext channelContext = ChannelContext(); Channel channel = await client.connect(hostAndPort, channelContext); @@ -133,24 +156,79 @@ class HttpClients { request.headers.host = '${uri.host}${uri.hasPort ? ':${uri.port}' : ''}'; } catch (_) {} } - request.protocolVersion = 'HTTP/1.1'; ChannelContext channelContext = ChannelContext(); var httpResponseHandler = HttpResponseHandler(); request.hostAndPort ??= HostAndPort.of(request.requestUrl); Channel channel = - await proxyConnect(proxyInfo: proxyInfo, request.hostAndPort!, httpResponseHandler, channelContext); + await proxyConnect(request, proxyInfo: proxyInfo, request.hostAndPort!, httpResponseHandler, channelContext); if (!request.uri.startsWith("/")) { Uri? uri = request.requestUri; request = request.copy(uri: '${uri!.path}${uri.hasQuery ? '?${uri.query}' : ''}'); } + + if (channel.selectedProtocol == 'h2') { + request.headers.remove(HttpHeaders.HOST); + request.streamId = 1; + } await channel.write(request); return httpResponseHandler.getResponse(timeout).whenComplete(() => channel.close()); } } +class Http2ClientHandler { + static const int FLAG_ACK = 0x1; + + ByteBuf byteBuf = ByteBuf(); + Http2ResponseDecoder decoder = Http2ResponseDecoder(); + final ChannelHandler handler; + + Http2ClientHandler(this.handler); + + Future listen(Channel channel, ChannelContext channelContext) async { + channel.dispatcher.encoder = Http2RequestDecoder(); + channel.dispatcher.decoder = decoder; + + channel.socket.listen((data) => onData(channelContext, channel, data), + onError: (error, trace) => handler.exceptionCaught(channelContext, channel, error, trace: trace), + onDone: () => handler.channelInactive(channelContext, channel)); + + channel.writeBytes("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".codeUnits); + } + + onData(ChannelContext channelContext, Channel channel, Uint8List data) { + byteBuf.add(data); + var decodeResult = decoder.decode(channelContext, byteBuf); + + if (!decodeResult.isDone) { + return; + } + + byteBuf.clearRead(); + + if (decodeResult.forward != null) { + ByteBuf buffer = ByteBuf(decodeResult.forward); + + FrameHeader? frameHeader = FrameReader.readFrameHeader(buffer); + logger.d("Http2ClientHandler forward ${frameHeader?.type}"); + if (frameHeader?.type == FrameType.settings) { + // 检查是否需要发送 ACK + if (frameHeader!.hasAckFlag == false) { + // 发送带有 ACK 标志的 SETTINGS 帧 + var ackFrame = FrameHeader(0, FrameType.settings, FLAG_ACK, 0); + channel.writeBytes(ackFrame.encode()); + } + } + + return; + } + + handler.channelRead(channelContext, channel, decodeResult.data!); + } +} + class HttpResponseHandler extends ChannelHandler { Completer _completer = Completer(); diff --git a/lib/network/http/body_reader.dart b/lib/network/http/parse/body_reader.dart similarity index 98% rename from lib/network/http/body_reader.dart rename to lib/network/http/parse/body_reader.dart index a064a62..97db502 100644 --- a/lib/network/http/body_reader.dart +++ b/lib/network/http/parse/body_reader.dart @@ -20,8 +20,8 @@ import 'dart:typed_data'; import 'package:proxypin/network/http/constants.dart'; import 'package:proxypin/network/http/http.dart'; -import '../../utils/num.dart'; -import 'codec.dart'; +import '../../../utils/num.dart'; +import '../codec.dart'; class Result { final bool isDone; diff --git a/lib/network/http/http_parser.dart b/lib/network/http/parse/http_parser.dart similarity index 100% rename from lib/network/http/http_parser.dart rename to lib/network/http/parse/http_parser.dart diff --git a/lib/utils/lang.dart b/lib/utils/lang.dart index a672280..e6d7a46 100644 --- a/lib/utils/lang.dart +++ b/lib/utils/lang.dart @@ -119,6 +119,7 @@ extension StringEnhance on String { return this; } } + String fixAutoLines() { return Characters(this).join('\u{200B}'); } @@ -142,8 +143,8 @@ extension StringEnhance on String { } class Pair { - final K key; - final V value; + final K? key; + V? value; Pair(this.key, this.value); }