diff --git a/lib/network/channel/channel_dispatcher.dart b/lib/network/channel/channel_dispatcher.dart index e510c4a..b9beacf 100644 --- a/lib/network/channel/channel_dispatcher.dart +++ b/lib/network/channel/channel_dispatcher.dart @@ -13,6 +13,7 @@ import 'package:proxypin/network/util/attribute_keys.dart'; import 'package:proxypin/network/util/byte_buf.dart'; import 'package:proxypin/network/util/logger.dart'; import 'package:proxypin/network/util/process_info.dart'; +import 'package:proxypin/network/handle/sse_handle.dart'; import '../util/task_queue.dart'; @@ -201,10 +202,47 @@ class ChannelDispatcher extends ChannelHandler { remoteChannel.dispatcher.channelHandle(rawCodec, WebSocketChannelHandler(channel, data.request!)); } + /// SSE 处理 (text/event-stream) + void onSseHandle(ChannelContext channelContext, Channel channel, HttpResponse response, List? initialBody) { + Channel remoteChannel = channelContext.getAttribute(channel.id); + channelContext.currentRequest?.response = response; + response.request ??= channelContext.currentRequest; + channelContext.listener?.onResponse(channelContext, response); + + remoteChannel.write(channelContext, response); + + // Switch to raw streaming: server->client uses SseChannelHandler; client->server just relays + var rawCodec = RawCodec(); + channel.dispatcher.channelHandle(rawCodec, SseChannelHandler(remoteChannel, response)); + remoteChannel.dispatcher.channelHandle(rawCodec, RelayHandler(channel)); + + // Flush any initial body bytes that were already read + if (initialBody != null && initialBody.isNotEmpty) { + // Place existing buffered bytes and let handler consume + buffer.add(initialBody); + var body = buffer.bytes; + buffer.clear(); + handler.channelRead(channelContext, channel, body); + } + } + void notSupportedForward(ChannelContext channelContext, Channel channel, DecoderResult decodeResult) { Channel? remoteChannel = channelContext.getAttribute(channel.id); + + // If this is an SSE response, switch to SSE streaming mode instead of generic relay + if (decodeResult.data is HttpResponse) { + var response = decodeResult.data as HttpResponse; + if (response.headers.contentType.toLowerCase().startsWith('text/event-stream') && remoteChannel != null) { + logger.d("[$channel] switch to SSE streaming"); + onSseHandle(channelContext, channel, response, decodeResult.forward); + return; + } + } + + // Fallback: generic relay for unsupported body types buffer.add(decodeResult.forward ?? []); relay(channelContext, channel, remoteChannel!); + if (decodeResult.data is HttpResponse) { var response = decodeResult.data as HttpResponse; logger.w("[$channel] not supported parse ${response.headers.contentType}"); diff --git a/lib/network/handle/sse_handle.dart b/lib/network/handle/sse_handle.dart new file mode 100644 index 0000000..dd4a7ff --- /dev/null +++ b/lib/network/handle/sse_handle.dart @@ -0,0 +1,38 @@ +import 'dart:typed_data'; + +import 'package:proxypin/network/channel/channel.dart'; +import 'package:proxypin/network/channel/channel_context.dart'; +import 'package:proxypin/network/http/http.dart'; +import 'package:proxypin/network/http/sse.dart'; +import 'package:proxypin/network/http/websocket.dart'; +import 'package:proxypin/network/util/logger.dart'; + +/// SSE (text/event-stream) handler: forwards raw bytes and emits parsed message frames. +class SseChannelHandler extends ChannelHandler { + final SseDecoder decoder = SseDecoder(); + + final Channel proxyChannel; + final HttpMessage message; // HttpResponse on server->client, HttpRequest on client->server + + SseChannelHandler(this.proxyChannel, this.message); + + @override + Future channelRead(ChannelContext channelContext, Channel channel, Uint8List msg) async { + // Always forward the raw bytes first + proxyChannel.writeBytes(msg); + + try { + final frames = decoder.feed(msg); + for (final WebSocketFrame frame in frames) { + frame.isFromClient = message is HttpRequest; + message.messages.add(frame); + channelContext.listener?.onMessage(channel, message, frame); + logger.d( + "[${channelContext.clientChannel?.id}] sse channelRead ${frame.payloadLength} ${frame.payloadDataAsString}"); + } + } catch (e, stackTrace) { + log.e("sse decode error", error: e, stackTrace: stackTrace); + } + } +} + diff --git a/lib/network/http/content_type.dart b/lib/network/http/content_type.dart index a59036b..ff68e44 100644 --- a/lib/network/http/content_type.dart +++ b/lib/network/http/content_type.dart @@ -30,7 +30,10 @@ enum ContentType { font, image, video, - http; + http, + sse + + ; static ContentType valueOf(String name) { return ContentType.values.firstWhere((element) => element.name == name.toLowerCase(), orElse: () => http); diff --git a/lib/network/http/http.dart b/lib/network/http/http.dart index 9e7409d..2575058 100644 --- a/lib/network/http/http.dart +++ b/lib/network/http/http.dart @@ -43,7 +43,8 @@ abstract class HttpMessage { "form-data": ContentType.formData, "image": ContentType.image, "video": ContentType.video, - "application/json": ContentType.json + "application/json": ContentType.json, + "text/event-stream": ContentType.sse, }; String protocolVersion; diff --git a/lib/network/http/sse.dart b/lib/network/http/sse.dart new file mode 100644 index 0000000..361bd39 --- /dev/null +++ b/lib/network/http/sse.dart @@ -0,0 +1,117 @@ +/* + * Server-Sent Events (text/event-stream) incremental decoder + */ + +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:proxypin/network/http/websocket.dart'; + +/// Parse SSE stream chunks into message frames. +/// We reuse WebSocketFrame as a generic message container so UI and listeners work. +class SseDecoder { + final StringBuffer _lineBuf = StringBuffer(); + + // current event fields + final StringBuffer _data = StringBuffer(); + String? _event; + String? _id; + int? _retry; + + /// Feed a chunk of bytes and return zero or more frames assembled. + List feed(Uint8List bytes) { + final List frames = []; + + // Append decoded text to buffer; allowMalformed to survive split UTF-8 sequences. + _lineBuf.write(utf8.decode(bytes, allowMalformed: true)); + + while (true) { + final String current = _lineBuf.toString(); + final int nl = current.indexOf('\n'); + if (nl == -1) break; + + String line = current.substring(0, nl); + _lineBuf.clear(); + if (nl + 1 < current.length) _lineBuf.write(current.substring(nl + 1)); + + if (line.endsWith('\r')) line = line.substring(0, line.length - 1); + + if (line.isEmpty) { + // End of event: emit if any data collected + if (_data.isNotEmpty) { + String dataValue = _data.toString(); + if (dataValue.endsWith('\n')) dataValue = dataValue.substring(0, dataValue.length - 1); + + // Build a text frame from the SSE event. Include event/id headers if present as a prefix comment. + final String payloadText = _event == null && _id == null + ? dataValue + : _buildLabeledPayload(dataValue, event: _event, id: _id, retry: _retry); + + frames.add(_textFrame(payloadText)); + } + _resetEventState(); + continue; + } + + if (line.startsWith(':')) { + // comment line – ignore + continue; + } + + final int colon = line.indexOf(':'); + final String field = (colon == -1) ? line : line.substring(0, colon); + String value = (colon == -1) ? '' : line.substring(colon + 1); + if (value.startsWith(' ')) value = value.substring(1); + + switch (field) { + case 'data': + _data.write(value); + _data.write('\n'); + break; + case 'event': + _event = value; + break; + case 'id': + _id = value; + break; + case 'retry': + _retry = int.tryParse(value); + break; + default: + // ignore unknown fields + break; + } + } + + return frames; + } + + void _resetEventState() { + _data.clear(); + _event = null; + _id = null; + _retry = null; + } + + String _buildLabeledPayload(String data, {String? event, String? id, int? retry}) { + final StringBuffer b = StringBuffer(); + if (event != null && event.isNotEmpty) b.writeln('event: $event'); + if (id != null && id.isNotEmpty) b.writeln('id: $id'); + if (retry != null) b.writeln('retry: $retry'); + b.write(data); + return b.toString(); + } + + WebSocketFrame _textFrame(String text) { + final bytes = utf8.encode(text); + return WebSocketFrame( + fin: true, + opcode: 0x01, // text + mask: false, + payloadLength: bytes.length, + maskingKey: 0, + payloadData: Uint8List.fromList(bytes), + ); + } +} + diff --git a/lib/ui/component/utils.dart b/lib/ui/component/utils.dart index 011aac0..d854225 100644 --- a/lib/ui/component/utils.dart +++ b/lib/ui/component/utils.dart @@ -36,6 +36,7 @@ const contentMap = { ContentType.text: Icons.text_fields, ContentType.css: Icons.css, ContentType.font: Icons.font_download, + ContentType.sse: Icons.stream, }; Widget getIcon(HttpResponse? response, {Color? color}) { @@ -54,6 +55,7 @@ Widget getIcon(HttpResponse? response, {Color? color}) { errorBuilder: (context, error, stackTrace) => Icon(Icons.image, size: 16, color: color ?? Colors.green), ); } + return SizedBox( width: 18, child: Icon(contentMap[contentType] ?? Icons.http, size: 16, color: color ?? Colors.green)); } diff --git a/lib/ui/content/body.dart b/lib/ui/content/body.dart index ece07c5..97d81d4 100644 --- a/lib/ui/content/body.dart +++ b/lib/ui/content/body.dart @@ -446,7 +446,7 @@ class _BodyState extends State<_Body> { } Widget _getBody(ViewType type) { - if (message?.isWebSocket == true) { + if (message?.isWebSocket == true || (message?.contentType == ContentType.sse && message?.messages.isNotEmpty == true)) { List? list = message?.messages .map((e) => Container( margin: const EdgeInsets.only(top: 2, bottom: 2), diff --git a/lib/ui/content/panel.dart b/lib/ui/content/panel.dart index c955259..be04296 100644 --- a/lib/ui/content/panel.dart +++ b/lib/ui/content/panel.dart @@ -133,12 +133,20 @@ class NetworkTabState extends State with SingleTickerProvi @override Widget build(BuildContext context) { bool isWebSocket = widget.request.get()?.isWebSocket == true; - tabs[tabs.length - 1] = isWebSocket ? "WebSocket" : 'Cookies'; + bool isSse = widget.response.get()?.headers.contentType.toLowerCase().startsWith('text/event-stream') == true; + bool isStreamMessages = isWebSocket || isSse; + if (isSse) { + tabs[tabs.length - 1] = "SSE"; + } else if (isWebSocket) { + tabs[tabs.length - 1] = "WebSocket"; + } else { + tabs[tabs.length - 1] = 'Cookies'; + } var tabBar = TabBar( padding: const EdgeInsets.only(bottom: 0), controller: _tabController, - dividerColor: Theme.of(context).dividerColor.withOpacity(0.15), + dividerColor: Theme.of(context).dividerColor.withValues(alpha: 0.15), labelPadding: const EdgeInsets.symmetric(horizontal: 10), tabs: tabs.map((title) => Tab(child: Text(title, style: widget.tabStyle, maxLines: 1))).toList(), ); @@ -170,7 +178,7 @@ class NetworkTabState extends State with SingleTickerProvi KeepAliveWrapper(child: request()), KeepAliveWrapper(child: response()), SelectionArea( - child: isWebSocket + child: isStreamMessages ? Websocket(widget.request, widget.response) : Cookies(widget.request, widget.response)), ],