ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

网络编程IO多路复用-服务端代码

2021-05-12 20:32:39  阅读:214  来源: 互联网

标签:selector SelectionKey java 多路复用 Selector IO import selectionKey 服务端


使用Java NIO完成服务端代码的编写,代码写的不完善,本文主要想体现多路复用的几种编程模型和思想。


一、单线程版本

使用单线程+NIO完成服务端代码的编写,并且使用一个Selector注册器。在一个线程中处理ServerSocketChannel的accept、SocketChannel的read、write。

  • Server

创建ServerSocketChannel,并将其注册到Selector中。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 *
 * 使用NIO多路复用处理与客户端的通信
 *
 */
public class Server {

    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    public Server(int port) throws Exception{
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void start() {
        while (!Thread.interrupted()) {
            try {
                if (selector.select()>0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        dispatch(selectionKey);
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    selector.close();
                } catch (Exception ee) {

                }
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        if (selectionKey.isAcceptable()) {
            new Acceptor(serverSocketChannel, selector).accept();
        } else if (selectionKey.isReadable()) {
            new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read();
        } else if (selectionKey.isWritable()) {
            new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write();
        }
    }

}
  • Acceptor

接收客户端的连接。

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责接收客户端的连接
 *
 */
public class Acceptor {

    private final ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    public void accept() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
  • ReadHandler

接收客户端发送的内容。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责读取客户端数据,即read
 *
 */
public class ReadHandler {

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void read() {
        try {
            // 读取客户端数据
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            byteBuffer.flip();
            System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array()));
            // 注册写事件,给客户端回消息
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } catch (Exception e) {
            // 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听
            // 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

}
  • WriteHandler

向客户端发送内容。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责向客户端响应数据,即wirte
 *
 */
public class WriteHandler {

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void write() {
        try {
            // 发送数据给客户端
            String msg = "你好,欢迎"+socketChannel.getRemoteAddress();
            ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
            socketChannel.write(buffer);
            // 注册读事件,继续等待客户端的信息
            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (Exception e) {
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

}

运行服务端:

/**
 *
 * 启动server服务
 */
public class Main {

    public static void main(String[] args) throws Exception {
        Server server = new Server(8800);
        server.start();
    }
}

二、多线程线程池版本

使用线程池在不同的线程中处理ServerSocketChannel的accept、SocketChannel的read、write。仍然使用一个Selector。

  • Server

43行中让当前线程暂停500,目的是让dispatch方法逻辑执行完成之后再执行iterator.remove()将当前的SelectionKey从集合中移除。因为dispatch中使用了线程池异步处理,可能会存在代码先执行了iterator.remove(),后执行dispatch逻辑,这样会导致错误。(这种sleep方式处理是存在问题的)

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * 使用NIO多路复用处理与客户端的通信
 *
 */
public class Server {

    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    private final ExecutorService executorService = Executors.newFixedThreadPool(1024);

    public Server(int port) throws Exception{
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void start() {
        while (!Thread.interrupted()) {
            try {
                if (selector.select()>0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        dispatch(selectionKey);
                        try {
                            Thread.sleep(500);
                        } catch (Exception e) {}

                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    selector.close();
                } catch (Exception ee) {

                }
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        if (selectionKey.isAcceptable()) {
            executorService.execute(new Acceptor(serverSocketChannel, selector));
        } else if (selectionKey.isReadable()) {
            executorService.execute(new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey));
        } else if (selectionKey.isWritable()) {
            executorService.execute(new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey));
        }
    }

}
  • Acceptor

实现Runnable接口,重写run方法。

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责接收客户端的连接
 *
 */
public class Acceptor implements Runnable{

    private final ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    public void accept() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        accept();
    }
}
  • ReadHandler

实现Runnable接口,重写run方法。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责读取客户端数据,即read
 *
 */
public class ReadHandler implements Runnable{

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void read() {
        try {
            // 读取客户端数据
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            byteBuffer.flip();
            System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array()));
            // 注册写事件,给客户端回消息
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } catch (Exception e) {
            // 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听
            // 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        read();
    }
}
  • WriteHandler

实现Runnable接口,重写run方法。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责向客户端响应数据,即wirte
 *
 */
public class WriteHandler implements Runnable{

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void write() {
        try {
            // 发送数据给客户端
            String msg = "你好,欢迎"+socketChannel.getRemoteAddress();
            ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
            socketChannel.write(buffer);
            // 注册读事件,继续等待客户端的信息
            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (Exception e) {
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        write();
    }
}

三、多Selector(主从)

一个Selector负责接收客户端的连接(ServerSocketChannel#accept),多个Selector负责客户端的读写数据(SocketChannel#read、SokcetChannel#write)。

  • Server
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * 使用NIO多路复用处理与客户端的通信
 *
 */
public class Server {

    /**
     * 主selector,负责监听accept事件,处理客户端的连接
     */
    private final Selector masterSelector;

    /**
     * 存放从selector集合
     */
    private Selector[] selectors;

    private final ServerSocketChannel serverSocketChannel;

    private final ExecutorService executorService = Executors.newFixedThreadPool(1024);

    public Server(int port) throws Exception{
        masterSelector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(masterSelector, SelectionKey.OP_ACCEPT);

        // 创建两个从selector
        Selector subSelector1 = Selector.open();
        Selector subSelector2 = Selector.open();
        selectors = new Selector[]{subSelector1, subSelector2};

    }

    /**
     * 每个Selector放进单独的线程的进行循环,
     * 避免select阻塞互相影响。
     */
    public void start() {
        executorService.execute(()->loop(masterSelector));
        for (Selector selector : selectors) {
            executorService.execute(()->loop(selector));
        }
    }

    /**
     * 因为Selector的select是阻塞方法,多个Selector在单线程中循环,
     * 会造成互相等待的影响,所以每个Selector都另起一个线程。
     * @param selector
     */
    private void loop(Selector selector) {
        while (!Thread.interrupted()) {
            try {
                // 这里最多阻塞1秒则直接返回
                int select = selector.select(1000);
                if (select>0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        dispatch(selectionKey);
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    selector.close();
                } catch (Exception ee) {

                }
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        if (selectionKey.isAcceptable()) {
            // 从子selector中随机取出一个作为参数
            Random random = new Random();
            int i = random.nextInt(selectors.length);
            Selector subSelector = selectors[i];
            new Acceptor(serverSocketChannel, subSelector).accept();
        } else if (selectionKey.isReadable()) {
            new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read();
        } else if (selectionKey.isWritable()) {
            new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write();
        }
    }

}

一主二从,一个主Selector负责accept客户端连接。两个从Selector负责与客户端read、write。

其他类的代码同单线程版本。

标签:selector,SelectionKey,java,多路复用,Selector,IO,import,selectionKey,服务端
来源: https://blog.csdn.net/weixin_50518271/article/details/116720184

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有