叮当猫咪
长话短说:如果使用默认 HTTP 处理程序达到了使 GC 成为问题所需的量,那么无论如何都应该使用代理服务器进行扩展。在 Norman 的回答之后,我最终尝试了一个非常简单的可共享 HTTP 编解码器/聚合器 POC,看看这是否值得追求。我的可共享解码器距离RFC 7230还很远,但它满足了我当前项目的足够要求。然后我使用httperf和VisualVM来了解 GC 负载差异的概念。经过我的努力,GC 率仅降低了 10%。换句话说,这确实没有多大区别。唯一真正值得赞赏的效果是,与使用打包的非共享 HTTP 编解码器 + 聚合器和我的可共享编解码器相比,在运行 1000 个请求/秒时,我的错误减少了 5%。只有当我每秒执行 1000 个请求并持续超过 10 秒时才会出现这种情况。最后我不会去追求它。为了获得通过使用代理服务器可以解决的微小好处而将其变成完全兼容 HTTP 的解码器所需的时间根本不值得。出于参考目的,这里是我尝试过的组合可共享解码器/聚合器:import java.util.concurrent.ConcurrentHashMap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelId;import io.netty.channel.ChannelInboundHandlerAdapter;@Sharablepublic class SharableHttpDecoder extends ChannelInboundHandlerAdapter { private static final ConcurrentHashMap<ChannelId, SharableHttpRequest> MAP = new ConcurrentHashMap<ChannelId, SharableHttpRequest>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; ChannelId channelId = ctx.channel().id(); SharableHttpRequest request = MAP.get(channelId); if (request == null) { request = new SharableHttpRequest(buf); buf.release(); if (request.isComplete()) { ctx.fireChannelRead(request); } else { MAP.put(channelId, request); } } else { request.append(buf); buf.release(); if (request.isComplete()) { ctx.fireChannelRead(request); } } } else { // TODO send 501 System.out.println("WTF is this? " + msg.getClass().getName()); ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Unable to handle request on channel: " + ctx.channel().id().asLongText()); cause.printStackTrace(System.err); // TODO send 500 ctx.fireExceptionCaught(cause); ctx.close(); } }解码器创建的用于在管道上处理的结果对象:import java.util.Arrays;import java.util.HashMap;import io.netty.buffer.ByteBuf;public class SharableHttpRequest{ private static final byte SPACE = 32; private static final byte COLON = 58; private static final byte CARRAIGE_RETURN = 13; private HashMap<Header,String> myHeaders; private Method myMethod; private String myPath; private byte[] myBody; private int myIndex = 0; public SharableHttpRequest(ByteBuf buf) { try { myHeaders = new HashMap<Header,String>(); final StringBuilder builder = new StringBuilder(8); parseRequestLine(buf, builder); while (parseNextHeader(buf, builder)); parseBody(buf); } catch (Exception e) { e.printStackTrace(System.err); } } public String getHeader(Header name) { return myHeaders.get(name); } public Method getMethod() { return myMethod; } public String getPath() { return myPath; } public byte[] getBody() { return myBody; } public boolean isComplete() { return myIndex >= myBody.length; } public void append(ByteBuf buf) { int length = buf.readableBytes(); buf.getBytes(buf.readerIndex(), myBody, myIndex, length); myIndex += length; } private void parseRequestLine(ByteBuf buf, StringBuilder builder) { int idx = buf.readerIndex(); int end = buf.writerIndex(); for (; idx < end; ++idx) { byte next = buf.getByte(idx); // break on CR if (next == CARRAIGE_RETURN) { break; } // we need the method else if (myMethod == null) { if (next == SPACE) { myMethod = Method.fromBuilder(builder); builder.delete(0, builder.length()); builder.ensureCapacity(100); } else { builder.append((char) next); } } // we need the path else if (myPath == null) { if (next == SPACE) { myPath = builder.toString(); builder.delete(0, builder.length()); } else { builder.append((char) next); } } // don't need the version right now } idx += 2; // skip line endings buf.readerIndex(idx); } private boolean parseNextHeader(ByteBuf buf, StringBuilder builder) { Header header = null; int idx = buf.readerIndex(); int end = buf.writerIndex(); for (; idx < end; ++idx) { byte next = buf.getByte(idx); // break on CR if (next == CARRAIGE_RETURN) { if (header != Header.UNHANDLED) { myHeaders.put(header,builder.toString()); builder.delete(0, builder.length()); } break; } else if (header == null) { // we have the full header name if (next == COLON) { header = Header.fromBuilder(builder); builder.delete(0, builder.length()); } // get header name as lower case for mapping purposes else { builder.append(next > 64 && next < 91 ? (char) ( next | 32 ) : (char) next); } } // we don't care about some headers else if (header == Header.UNHANDLED) { continue; } // skip initial spaces else if (builder.length() == 0 && next == SPACE) { continue; } // get the header value else { builder.append((char) next); } } idx += 2; // skip line endings buf.readerIndex(idx); if (buf.getByte(idx) == CARRAIGE_RETURN) { idx += 2; // skip line endings buf.readerIndex(idx); return false; } else { return true; } } private void parseBody(ByteBuf buf) { int length = buf.readableBytes(); if (length == 0) { myBody = new byte[0]; myIndex = 1; } else { System.out.println("Content-Length: " + myHeaders.get(Header.CONTENT_LENGTH)); if (myHeaders.get(Header.CONTENT_LENGTH) != null) { int totalLength = Integer.valueOf(myHeaders.get(Header.CONTENT_LENGTH)); myBody = new byte[totalLength]; buf.getBytes(buf.readerIndex(), myBody, myIndex, length); myIndex += length; } // TODO handle chunked } } public enum Method { GET(new char[]{71, 69, 84}), POST(new char[]{80, 79, 83, 84}), UNHANDLED(new char[]{}); // could be expanded if needed private char[] chars; Method(char[] chars) { this.chars = chars; } public static Method fromBuilder(StringBuilder builder) { for (Method method : Method.values()) { if (method.chars.length == builder.length()) { boolean match = true; for (int i = 0; i < builder.length(); i++) { if (method.chars[i] != builder.charAt(i)) { match = false; break; } } if (match) { return method; } } } return null; } } public enum Header { HOST(new char[]{104, 111, 115, 116}), CONNECTION(new char[]{99, 111, 110, 110, 101, 99, 116, 105, 111, 110}), IF_MODIFIED_SINCE(new char[]{ 105, 102, 45, 109, 111, 100, 105, 102, 105, 101, 100, 45, 115, 105, 110, 99, 101}), COOKIE(new char[]{99, 111, 111, 107, 105, 101}), CONTENT_LENGTH(new char[]{ 99, 111, 110, 116, 101, 110, 116, 45, 108, 101, 110, 103, 116, 104}), UNHANDLED(new char[]{}); // could be expanded if needed private char[] chars; Header(char[] chars) { this.chars = chars; } public static Header fromBuilder(StringBuilder builder) { for (Header header : Header.values()) { if (header.chars.length == builder.length()) { boolean match = true; for (int i = 0; i < builder.length(); i++) { if (header.chars[i] != builder.charAt(i)) { match = false; break; } } if (match) { return header; } } } return UNHANDLED; } }}用于测试的简单处理程序:import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;@Sharablepublic class SharableHttpHandler extends SimpleChannelInboundHandler<SharableHttpRequest>{ @Override protected void channelRead0(ChannelHandlerContext ctx, SharableHttpRequest msg) throws Exception { String message = "HTTP/1.1 200 OK\r\n" + "Content-type: text/html\r\n" + "Content-length: 42\r\n\r\n" + "<html><body>Hello sharedworld</body><html>"; ByteBuf buffer = ctx.alloc().buffer(message.length()); buffer.writeCharSequence(message, CharsetUtil.UTF_8); ChannelFuture flushPromise = ctx.channel().writeAndFlush(buffer); flushPromise.addListener(ChannelFutureListener.CLOSE); if (!flushPromise.isSuccess()) { flushPromise.cause().printStackTrace(System.err); } } }使用这些可共享处理程序的完整管道:import tests.SharableHttpDecoder;import tests.SharableHttpHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;public class ServerPipeline extends ChannelInitializer<SocketChannel>{ private final SharableHttpDecoder decoder = new SharableHttpDecoder(); private final SharableHttpHandler handler = new SharableHttpHandler(); @Override public void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(decoder); pipeline.addLast(handler); }}上面的内容针对这个(更常见的)非共享管道进行了测试:import static io.netty.handler.codec.http.HttpResponseStatus.OK;import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.DefaultFullHttpResponse;import io.netty.handler.codec.http.FullHttpRequest;import io.netty.handler.codec.http.FullHttpResponse;import io.netty.handler.codec.http.HttpHeaderNames;import io.netty.handler.codec.http.HttpHeaderValues;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.HttpUtil;import io.netty.util.CharsetUtil;public class ServerPipeline extends ChannelInitializer<SocketChannel>{ @Override public void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new UnsharedHttpHandler()); } class UnsharedHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { String message = "<html><body>Hello sharedworld</body><html>"; ByteBuf buffer = ctx.alloc().buffer(message.length()); buffer.writeCharSequence(message.toString(), CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buffer); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); HttpUtil.setContentLength(response, response.content().readableBytes()); response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ChannelFuture flushPromise = ctx.writeAndFlush(response); flushPromise.addListener(ChannelFutureListener.CLOSE); } }}