http2 streamDependency (#388)(#51)

This commit is contained in:
wanghongenpin
2025-05-12 19:20:07 +08:00
parent 44161857b1
commit c94ff7eba9
13 changed files with 104 additions and 60 deletions

View File

@@ -66,7 +66,7 @@ abstract interface class Decoder<T> {
/// 编码
abstract interface class Encoder<T> {
List<int> encode(T data);
List<int> encode(ChannelContext channelContext, T data);
}
/// 编解码器
@@ -154,9 +154,9 @@ abstract class HttpCodec<T extends HttpMessage> implements Codec<T, T> {
void initialLine(BytesBuilder buffer, T message);
@override
List<int> encode(T message) {
List<int> encode(ChannelContext channelContext, T message) {
if (message.protocolVersion == "HTTP/2") {
return getH2Codec().encode(message);
return getH2Codec().encode(channelContext, message);
}
BytesBuilder builder = BytesBuilder();
@@ -278,8 +278,8 @@ class HttpServerCodec extends Codec<HttpRequest, HttpResponse> {
}
@override
List<int> encode(HttpResponse data) {
return responseCodec.encode(data);
List<int> encode(ChannelContext channelContext, HttpResponse data) {
return responseCodec.encode(channelContext, data);
}
}
@@ -293,7 +293,7 @@ class HttpClientCodec extends Codec<HttpResponse, HttpRequest> {
}
@override
List<int> encode(HttpRequest data) {
return requestCodec.encode(data);
List<int> encode(ChannelContext channelContext, HttpRequest data) {
return requestCodec.encode(channelContext, data);
}
}

View File

@@ -19,6 +19,7 @@ enum FrameType { data, headers, priority, rstStream, settings, pushPromise, ping
class FrameHeader {
static const flagsEndStream = 0x01;
static const flagsEndHeaders = 0x04;
static const flagsPriority = 0x20;
final int length;
final FrameType type;
@@ -29,7 +30,7 @@ class FrameHeader {
bool get hasPaddedFlag => (flags & 0x08) == 0x08;
bool get hasPriorityFlag => (flags & 0x20) == 0x20;
bool get hasPriorityFlag => (flags & flagsPriority) == flagsPriority;
bool get hasEndHeadersFlag => (flags & flagsEndHeaders) == flagsEndHeaders;
@@ -74,7 +75,7 @@ class HeadersFrame extends Frame {
final bool exclusiveDependency;
final int? streamDependency;
final int? weight;
final List<int> headerBlockFragment;
List<int> headerBlockFragment;
HeadersFrame(super.header, this.padLength, this.exclusiveDependency, this.streamDependency, this.weight,
this.headerBlockFragment);

View File

@@ -108,8 +108,12 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
switch (frameHeader.type) {
case FrameType.headers:
//处理HEADERS帧
_handleHeadersFrame(channelContext, frameHeader, ByteBuf(framePayload));
var headersFrame = _handleHeadersFrame(channelContext, frameHeader, ByteBuf(framePayload));
result.isDone = frameHeader.hasEndStreamFlag && frameHeader.hasEndHeadersFlag;
if (headersFrame.streamDependency != null) {
headersFrame.headerBlockFragment = [];
channelContext.put(frameHeader.streamIdentifier, headersFrame);
}
break;
case FrameType.continuation:
//处理CONTINUATION帧
@@ -168,7 +172,7 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
List<Header> encodeHeaders(T message);
@override
Uint8List encode(T data) {
Uint8List encode(ChannelContext channelContext, T data) {
var bytesBuilder = BytesBuilder();
if (data.headers.getInt(HttpHeaders.CONTENT_LENGTH) != null) {
data.headers.set(HttpHeaders.CONTENT_LENGTH.toLowerCase(), "${data.body?.length ?? 0}");
@@ -179,7 +183,7 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
//headers
var headers = encodeHeaders(data);
writeHeadersFrame(bytesBuilder, data.streamId!, headers, endStream: emptyBody);
writeHeadersFrame(bytesBuilder, channelContext, data.streamId!, headers, endStream: emptyBody);
//body
if (!emptyBody) {
@@ -199,25 +203,26 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
void writeHeadersFrame(
BytesBuilder bytesBuilder,
ChannelContext channelContext,
int streamId,
List<Header> headers, {
StreamSetting? setting,
bool endStream = true,
}) {
var fragment = _hpackEncoder.encode(headers);
var maxSize = setting?.maxFrameSize ?? maxFrameSize;
var maxSize = channelContext.setting?.maxFrameSize ?? maxFrameSize;
if (fragment.length < maxSize) {
int flags = FrameHeader.flagsEndHeaders;
if (endStream) {
flags |= FrameHeader.flagsEndStream;
}
_writeFrame(bytesBuilder, FrameType.headers, flags, streamId, fragment);
_writeHeadersFrame(bytesBuilder, channelContext, flags, streamId, fragment);
} else {
var chunk = fragment.sublist(0, maxSize);
fragment = fragment.sublist(maxSize);
_writeFrame(bytesBuilder, FrameType.headers, 0, streamId, chunk);
_writeHeadersFrame(bytesBuilder, channelContext, 0, streamId, chunk);
while (fragment.length > maxSize) {
var chunk = fragment.sublist(0, maxSize);
@@ -234,8 +239,29 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
}
}
void _writeFrame(BytesBuilder bytesBuilder, FrameType type, int flag, int streamId, List<int> payload) {
FrameHeader frameHeader = FrameHeader(payload.length, type, flag, streamId);
void _writeHeadersFrame(
BytesBuilder bytesBuilder, ChannelContext channelContext, int flags, int streamId, List<int> payload) {
var streamPriority = channelContext.removeStreamDependency(streamId);
if (streamPriority != null) {
flags |= FrameHeader.flagsPriority;
bool exclusive = streamPriority.exclusiveDependency;
int streamDependency = streamPriority.streamDependency!;
payload = [
(exclusive ? 0x80 : 0) | (streamDependency & 0x7FFFFFFF) >> 24,
(streamDependency & 0x00FF0000) >> 16,
(streamDependency & 0x0000FF00) >> 8,
(streamDependency & 0x000000FF),
streamPriority.weight!,
...payload
];
}
_writeFrame(bytesBuilder, FrameType.headers, flags, streamId, payload);
}
void _writeFrame(BytesBuilder bytesBuilder, FrameType type, int flags, int streamId, List<int> payload) {
FrameHeader frameHeader = FrameHeader(payload.length, type, flags, streamId);
// logger.d(
// "${this is Http2RequestDecoder ? 'request' : 'response'} _writeFrame streamId: ${frameHeader.streamIdentifier} ${frameHeader.type} flags:${frameHeader.flags} endHeaders: ${frameHeader.hasEndHeadersFlag} endStream: ${frameHeader.hasEndStreamFlag} ${payload.length}");
@@ -300,7 +326,7 @@ abstract class Http2Codec<T extends HttpMessage> implements Codec<T, T> {
weight = payload.readByte(); // 读取权重
logger.d(
"PRIORITY frame parsed: exclusive=$exclusiveDependency, streamDependency=$streamDependency, weight=$weight");
"PRIORITY frame parsed: streamId:${frameHeader.streamIdentifier} padLength:$padLength exclusive=$exclusiveDependency, streamDependency=$streamDependency, weight=$weight");
}
var headerBlockLength = payload.length - payload.readerIndex - padLength;

View File

@@ -67,7 +67,7 @@ class HttpClients {
var channel = await client.connect(connectHost, channelContext);
if (proxyInfo != null) {
await connectRequest(hostAndPort, channel, proxyInfo: proxyInfo);
await connectRequest(channelContext, hostAndPort, channel, proxyInfo: proxyInfo);
}
if (hostAndPort.isSsl()) {
@@ -88,7 +88,8 @@ class HttpClients {
}
///发起代理连接请求
static Future<Channel> connectRequest(HostAndPort hostAndPort, Channel channel, {ProxyInfo? proxyInfo}) async {
static Future<Channel> connectRequest(ChannelContext channelContext, HostAndPort hostAndPort, Channel channel,
{ProxyInfo? proxyInfo}) async {
ChannelHandler handler = channel.dispatcher.handler;
//代理 发送connect请求
var httpResponseHandler = HttpResponseHandler();
@@ -103,7 +104,7 @@ class HttpClients {
proxyRequest.headers.set(HttpHeaders.PROXY_AUTHORIZATION, 'Basic $auth');
}
await channel.write(proxyRequest);
await channel.write(channelContext, proxyRequest);
var response = await httpResponseHandler.getResponse(const Duration(seconds: 5));
channel.dispatcher.handler = handler;
@@ -144,7 +145,7 @@ class HttpClients {
ChannelContext channelContext = ChannelContext();
Channel channel = await client.connect(hostAndPort, channelContext);
await channel.write(request);
await channel.write(channelContext, request);
return httpResponseHandler.getResponse(timeout).whenComplete(() => channel.close());
}
@@ -175,7 +176,7 @@ class HttpClients {
request.headers.remove(HttpHeaders.HOST);
request.streamId = 1;
}
await channel.write(request);
await channel.write(channelContext, request);
return httpResponseHandler.getResponse(timeout).whenComplete(() => channel.close());
}
}