Netty实例

2022/8/8 23:24:36

本文主要是介绍Netty实例,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

  本文netty的实例,主要包括几个知识点

  1 如何拿到channel (网上的大多数例子都是写在handler 的 channelActive 方法中的)

  2  自定义协议格式,使用 LengthFieldBasedFrameDecoder 解码

  3  使用CompleteFuture 模拟同步发送(一次发送收到回复后才进行下一次发送)

 

   协议体 

package com.chinaunicom.asset.server.assetpublish.Netty;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class RpcMessage {
    //请求id
    private Integer requestId;
    // body长度
    private Integer length;

    private String body;

    public RpcMessage(Integer requestId, Integer length, String body) {
        this.requestId = requestId;
        this.body = body;
        this.length = length;
    }

}

  客户端 

public class FirstNettyClient {
    private final String host;
    private final int port;

    public static Map<Integer, CompletableFuture> getFutureMap() {
        return futureMap;
    }

    public void setFutureMap(Map<Integer, CompletableFuture> futureMap) {
        this.futureMap = futureMap;
    }

    private static Map<Integer, CompletableFuture> futureMap;

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    private Channel channel;

    public FirstNettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {

        futureMap = new HashMap<>();

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                EventLoopGroup group = new NioEventLoopGroup();
                try {
                    //创建Bootstrap
                    Bootstrap b = new Bootstrap();
                    //指定EventLoopGroup以处理客户端事件;需要适用于NIO的实现
                    b.group(group)
                            //适用于NIO传输的Channel类型
                            .channel(NioSocketChannel.class)
                            //设置服务器的连接地址
                            .remoteAddress(new InetSocketAddress(host, port))
                            .handler(new ChannelInitializer<SocketChannel>() {
                                //在创建Channel时,向ChannelPipeline中添加一个FirstNettyClientHandler实例
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {

                                    ch.pipeline().addLast(new MyClientProtocolDecoder(16384,4,4, 0,0, true));
                                    ch.pipeline().addLast(new FirstNettyClientHandler());
                                }
                            });
                    //连接到远程节点,阻塞等待直到连接完成
                    ChannelFuture f = b.connect().sync();
                    //阻塞,直到Channel关闭
                    channel = f.channel();

                    f.channel().closeFuture().sync();
                } catch (Exception e) {

                } finally {
                    //关闭线程池并且释放所有的资源
                    try {
                        group.shutdownGracefully().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread startup = new Thread(runnable);
        startup.start();

    }

    public static void main(String[] args) throws Exception {
        FirstNettyClient nettyClient = new FirstNettyClient("127.0.0.1",6666);
        nettyClient.start();
        Thread.sleep(2000);

        byte[] requestArray = int2Bytes(110808);

        byte[] content = "int2Bytes(110808)".getBytes();

        byte[] lengthArray = int2Bytes(content.length);

        CompletableFuture completableFuture = new CompletableFuture();
        nettyClient.getFutureMap().put(110808, completableFuture);

        nettyClient.getChannel().writeAndFlush(Unpooled.copiedBuffer(requestArray, lengthArray, content));

        completableFuture.get(5, TimeUnit.SECONDS);

        System.out.println("next is ready");
    }

    public static byte[] int2Bytes(int integer) {
        byte[] targets = new byte[4];
        targets[3] = (byte) (integer & 0xFF);
        targets[2] = (byte) (integer >> 8 & 0xFF);
        targets[1] = (byte) (integer >> 16 & 0xFF);
        targets[0] = (byte) (integer >> 24 & 0xFF);
        return targets;
    }
}

  其中main也可以单独提出来成为一个方法

  FirstNettyClientHandler 

public class FirstNettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
    /**
     * 和服务器连接建立后将被调用
     */
    public void channelActive(ChannelHandlerContext ctx) {
        //当被通知Channel是活跃的时候,发送一条消息
        System.out.println("client active");
//        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
        // ctx.writeAndFlush("Netty rocks!");

    }
    /**
     * 当从服务器接收到一条消息时被调用
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, RpcMessage in) throws Exception {

        CompletableFuture completableFuture = FirstNettyClient.getFutureMap().get(in.getRequestId());

        System.out.println("Client received:" + in.getRequestId() + " " + in.getBody());

        System.out.println("Client future done:");
        completableFuture.complete(null);
    }
    /**
     * 引发异常时会被调用
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("Can't connect server");
        //发生异常时,记录错误并关闭Channel
        cause.printStackTrace();
        ctx.close();
    }
}

  解码器 MyClientProtocolDecoder 

public class MyClientProtocolDecoder extends LengthFieldBasedFrameDecoder {

    /**
     * @param maxFrameLength  帧的最大长度
     * @param lengthFieldOffset length字段偏移的地址
     * @param lengthFieldLength length字段所占的字节长
     * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
     * @param initialBytesToStrip 解析时候跳过多少个长度
     * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
     */

    public MyClientProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {

        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);

    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
        in = (ByteBuf) super.decode(ctx,in);

        if(in == null){
            return null;
        }
//        if(in.readableBytes()<HEADER_SIZE){
//            throw new Exception("字节数不足");
//        }
        //读取requestId字段
        int requestId = in.readInt();
        //读取length字段
        int length = in.readInt();

        if(in.readableBytes()!=length){
            throw new Exception("标记的长度不符合实际长度");
        }
        //读取body
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);

        return new RpcMessage(requestId, length, new String(bytes,"UTF-8"));

    }
}

  服务端

public class FirstNettyServer {
    private final int port;

    public FirstNettyServer(int port) {
        this.port = port;
    }

    @SneakyThrows
    public static void main(String[] args) {
        //启动服务器
        new FirstNettyServer(6666).start();
    }

    public void start() throws Exception {
        final FirstNettyServerHandler serverHandler = new FirstNettyServerHandler();
        final MyProtocolDecoder myProtocolDecoder = new MyProtocolDecoder(16384,4,4, 0,0, true);

        //1. 创建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //2.创建Server-Bootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    //3.指定所使用的NIO传输Channel
                    .channel(NioServerSocketChannel.class)
                    //4.使用指定的端口设置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    //5.添加一个FirstNettyServerHandler到子Channel的ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(myProtocolDecoder);
                            channel.pipeline().addLast(serverHandler);
                        }
                    });
            //6. 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
            ChannelFuture f = b.bind().sync();
            //7. 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
            f.channel().closeFuture().sync();
        } finally {
            //8. 关闭EventLoopGroup,释放所有资源
            group.shutdownGracefully().sync();
        }
    }
}

  



这篇关于Netty实例的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程