text/event-stream handle (#555)

This commit is contained in:
wanghongenpin
2025-11-13 20:58:01 +08:00
parent 6e0a30c1b7
commit 4289eeaf06
8 changed files with 213 additions and 6 deletions

View File

@@ -13,6 +13,7 @@ import 'package:proxypin/network/util/attribute_keys.dart';
import 'package:proxypin/network/util/byte_buf.dart';
import 'package:proxypin/network/util/logger.dart';
import 'package:proxypin/network/util/process_info.dart';
import 'package:proxypin/network/handle/sse_handle.dart';
import '../util/task_queue.dart';
@@ -201,10 +202,47 @@ class ChannelDispatcher extends ChannelHandler<Uint8List> {
remoteChannel.dispatcher.channelHandle(rawCodec, WebSocketChannelHandler(channel, data.request!));
}
/// SSE 处理 (text/event-stream)
void onSseHandle(ChannelContext channelContext, Channel channel, HttpResponse response, List<int>? initialBody) {
Channel remoteChannel = channelContext.getAttribute(channel.id);
channelContext.currentRequest?.response = response;
response.request ??= channelContext.currentRequest;
channelContext.listener?.onResponse(channelContext, response);
remoteChannel.write(channelContext, response);
// Switch to raw streaming: server->client uses SseChannelHandler; client->server just relays
var rawCodec = RawCodec();
channel.dispatcher.channelHandle(rawCodec, SseChannelHandler(remoteChannel, response));
remoteChannel.dispatcher.channelHandle(rawCodec, RelayHandler(channel));
// Flush any initial body bytes that were already read
if (initialBody != null && initialBody.isNotEmpty) {
// Place existing buffered bytes and let handler consume
buffer.add(initialBody);
var body = buffer.bytes;
buffer.clear();
handler.channelRead(channelContext, channel, body);
}
}
void notSupportedForward(ChannelContext channelContext, Channel channel, DecoderResult decodeResult) {
Channel? remoteChannel = channelContext.getAttribute(channel.id);
// If this is an SSE response, switch to SSE streaming mode instead of generic relay
if (decodeResult.data is HttpResponse) {
var response = decodeResult.data as HttpResponse;
if (response.headers.contentType.toLowerCase().startsWith('text/event-stream') && remoteChannel != null) {
logger.d("[$channel] switch to SSE streaming");
onSseHandle(channelContext, channel, response, decodeResult.forward);
return;
}
}
// Fallback: generic relay for unsupported body types
buffer.add(decodeResult.forward ?? []);
relay(channelContext, channel, remoteChannel!);
if (decodeResult.data is HttpResponse) {
var response = decodeResult.data as HttpResponse;
logger.w("[$channel] not supported parse ${response.headers.contentType}");

View File

@@ -0,0 +1,38 @@
import 'dart:typed_data';
import 'package:proxypin/network/channel/channel.dart';
import 'package:proxypin/network/channel/channel_context.dart';
import 'package:proxypin/network/http/http.dart';
import 'package:proxypin/network/http/sse.dart';
import 'package:proxypin/network/http/websocket.dart';
import 'package:proxypin/network/util/logger.dart';
/// SSE (text/event-stream) handler: forwards raw bytes and emits parsed message frames.
class SseChannelHandler extends ChannelHandler<Uint8List> {
final SseDecoder decoder = SseDecoder();
final Channel proxyChannel;
final HttpMessage message; // HttpResponse on server->client, HttpRequest on client->server
SseChannelHandler(this.proxyChannel, this.message);
@override
Future<void> channelRead(ChannelContext channelContext, Channel channel, Uint8List msg) async {
// Always forward the raw bytes first
proxyChannel.writeBytes(msg);
try {
final frames = decoder.feed(msg);
for (final WebSocketFrame frame in frames) {
frame.isFromClient = message is HttpRequest;
message.messages.add(frame);
channelContext.listener?.onMessage(channel, message, frame);
logger.d(
"[${channelContext.clientChannel?.id}] sse channelRead ${frame.payloadLength} ${frame.payloadDataAsString}");
}
} catch (e, stackTrace) {
log.e("sse decode error", error: e, stackTrace: stackTrace);
}
}
}

View File

@@ -30,7 +30,10 @@ enum ContentType {
font,
image,
video,
http;
http,
sse
;
static ContentType valueOf(String name) {
return ContentType.values.firstWhere((element) => element.name == name.toLowerCase(), orElse: () => http);

View File

@@ -43,7 +43,8 @@ abstract class HttpMessage {
"form-data": ContentType.formData,
"image": ContentType.image,
"video": ContentType.video,
"application/json": ContentType.json
"application/json": ContentType.json,
"text/event-stream": ContentType.sse,
};
String protocolVersion;

117
lib/network/http/sse.dart Normal file
View File

@@ -0,0 +1,117 @@
/*
* Server-Sent Events (text/event-stream) incremental decoder
*/
import 'dart:convert';
import 'dart:typed_data';
import 'package:proxypin/network/http/websocket.dart';
/// Parse SSE stream chunks into message frames.
/// We reuse WebSocketFrame as a generic message container so UI and listeners work.
class SseDecoder {
final StringBuffer _lineBuf = StringBuffer();
// current event fields
final StringBuffer _data = StringBuffer();
String? _event;
String? _id;
int? _retry;
/// Feed a chunk of bytes and return zero or more frames assembled.
List<WebSocketFrame> feed(Uint8List bytes) {
final List<WebSocketFrame> frames = [];
// Append decoded text to buffer; allowMalformed to survive split UTF-8 sequences.
_lineBuf.write(utf8.decode(bytes, allowMalformed: true));
while (true) {
final String current = _lineBuf.toString();
final int nl = current.indexOf('\n');
if (nl == -1) break;
String line = current.substring(0, nl);
_lineBuf.clear();
if (nl + 1 < current.length) _lineBuf.write(current.substring(nl + 1));
if (line.endsWith('\r')) line = line.substring(0, line.length - 1);
if (line.isEmpty) {
// End of event: emit if any data collected
if (_data.isNotEmpty) {
String dataValue = _data.toString();
if (dataValue.endsWith('\n')) dataValue = dataValue.substring(0, dataValue.length - 1);
// Build a text frame from the SSE event. Include event/id headers if present as a prefix comment.
final String payloadText = _event == null && _id == null
? dataValue
: _buildLabeledPayload(dataValue, event: _event, id: _id, retry: _retry);
frames.add(_textFrame(payloadText));
}
_resetEventState();
continue;
}
if (line.startsWith(':')) {
// comment line ignore
continue;
}
final int colon = line.indexOf(':');
final String field = (colon == -1) ? line : line.substring(0, colon);
String value = (colon == -1) ? '' : line.substring(colon + 1);
if (value.startsWith(' ')) value = value.substring(1);
switch (field) {
case 'data':
_data.write(value);
_data.write('\n');
break;
case 'event':
_event = value;
break;
case 'id':
_id = value;
break;
case 'retry':
_retry = int.tryParse(value);
break;
default:
// ignore unknown fields
break;
}
}
return frames;
}
void _resetEventState() {
_data.clear();
_event = null;
_id = null;
_retry = null;
}
String _buildLabeledPayload(String data, {String? event, String? id, int? retry}) {
final StringBuffer b = StringBuffer();
if (event != null && event.isNotEmpty) b.writeln('event: $event');
if (id != null && id.isNotEmpty) b.writeln('id: $id');
if (retry != null) b.writeln('retry: $retry');
b.write(data);
return b.toString();
}
WebSocketFrame _textFrame(String text) {
final bytes = utf8.encode(text);
return WebSocketFrame(
fin: true,
opcode: 0x01, // text
mask: false,
payloadLength: bytes.length,
maskingKey: 0,
payloadData: Uint8List.fromList(bytes),
);
}
}

View File

@@ -36,6 +36,7 @@ const contentMap = {
ContentType.text: Icons.text_fields,
ContentType.css: Icons.css,
ContentType.font: Icons.font_download,
ContentType.sse: Icons.stream,
};
Widget getIcon(HttpResponse? response, {Color? color}) {
@@ -54,6 +55,7 @@ Widget getIcon(HttpResponse? response, {Color? color}) {
errorBuilder: (context, error, stackTrace) => Icon(Icons.image, size: 16, color: color ?? Colors.green),
);
}
return SizedBox(
width: 18, child: Icon(contentMap[contentType] ?? Icons.http, size: 16, color: color ?? Colors.green));
}

View File

@@ -446,7 +446,7 @@ class _BodyState extends State<_Body> {
}
Widget _getBody(ViewType type) {
if (message?.isWebSocket == true) {
if (message?.isWebSocket == true || (message?.contentType == ContentType.sse && message?.messages.isNotEmpty == true)) {
List<Widget>? list = message?.messages
.map((e) => Container(
margin: const EdgeInsets.only(top: 2, bottom: 2),

View File

@@ -133,12 +133,20 @@ class NetworkTabState extends State<NetworkTabController> with SingleTickerProvi
@override
Widget build(BuildContext context) {
bool isWebSocket = widget.request.get()?.isWebSocket == true;
tabs[tabs.length - 1] = isWebSocket ? "WebSocket" : 'Cookies';
bool isSse = widget.response.get()?.headers.contentType.toLowerCase().startsWith('text/event-stream') == true;
bool isStreamMessages = isWebSocket || isSse;
if (isSse) {
tabs[tabs.length - 1] = "SSE";
} else if (isWebSocket) {
tabs[tabs.length - 1] = "WebSocket";
} else {
tabs[tabs.length - 1] = 'Cookies';
}
var tabBar = TabBar(
padding: const EdgeInsets.only(bottom: 0),
controller: _tabController,
dividerColor: Theme.of(context).dividerColor.withOpacity(0.15),
dividerColor: Theme.of(context).dividerColor.withValues(alpha: 0.15),
labelPadding: const EdgeInsets.symmetric(horizontal: 10),
tabs: tabs.map((title) => Tab(child: Text(title, style: widget.tabStyle, maxLines: 1))).toList(),
);
@@ -170,7 +178,7 @@ class NetworkTabState extends State<NetworkTabController> with SingleTickerProvi
KeepAliveWrapper(child: request()),
KeepAliveWrapper(child: response()),
SelectionArea(
child: isWebSocket
child: isStreamMessages
? Websocket(widget.request, widget.response)
: Cookies(widget.request, widget.response)),
],