1 /** 2 * Server,NIO本质是非阻塞。 Selector SelectionKey ServerSocketChannel SocketChannel 3 */ 4 public class MyServerSocketChannel { 5 public static void main(String[] args) { 6 Selector sel = null ; 7 // 8 try { 9 sel = Selector.open(); //开启挑选器10 ServerSocketChannel ssc = ServerSocketChannel.open(); //开启服务器SocketChannel11 InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 8888);12 ssc.bind(addr);13 ssc.configureBlocking(false); //配置非阻塞14 ssc.register(sel, SelectionKey.OP_ACCEPT); //***** 在挑选器中注册ssc通道 *****15 } catch (Exception e) {16 e.printStackTrace();17 }18 19 SocketChannel sc0 = null;20 ByteBuffer buf = null;21 // CharBuffer cbuf = CharBuffer.allocate(1024);22 while (true) {23 // 开始挑选24 try {25 sel.select(); //开始挑选26 } catch (Exception e) {27 e.printStackTrace();28 }29 // 处理发生的事件30 Setkeys = sel.selectedKeys(); //获得selectedKey集合31 for (SelectionKey key : keys) {32 try {33 // 是否是accept事件34 if (key.isAcceptable()) {35 // 得到服务器通道36 ServerSocketChannel ssc0 = (ServerSocketChannel) key.channel();//ssc0和ssc是同一个服务器通道,只是不同的引用37 // 接受客户端连接,返回SocketChannel38 sc0 = ssc0.accept();39 System.out.println(getClientInfo(sc0.socket()) + " : 上线了!" );40 // 注册非阻塞41 sc0.configureBlocking(false);42 // 在挑选器中注册新产生的SocketChannel43 sc0.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);44 //45 }46 // 可以读取数据了。47 if (key.isReadable()) {48 // 得到SocketChannel49 sc0 = (SocketChannel) key.channel();50 ByteArrayOutputStream baos = new ByteArrayOutputStream();51 52 //53 buf = ByteBuffer.allocate(1024);54 // 读取客户端发送的数据55 while (sc0.read(buf) != 0) {56 buf.flip();57 baos.write(buf.array(), 0, buf.limit());58 buf.clear(); //重置buf59 }60 // 构造缓冲区,写回去client.61 String str = "hello : " + new String(baos.toByteArray());62 // 输出内容63 System.out.println(getClientInfo(sc0.socket()) + str);64 65 //回传信息66 buf = ByteBuffer.allocate(str.getBytes().length);67 buf.put(str.getBytes());68 buf.flip();69 sc0.write(buf);70 buf.clear();71 }72 } catch (Exception e) {73 //从selector注销通道74 key.cancel();75 //76 if(key.channel() instanceof SocketChannel){77 Socket s = ((SocketChannel)key.channel()).socket();78 System.out.println(getClientInfo(s) + " : 下线了!");79 }80 }81 }82 keys.clear();83 }84 }85 86 /**87 * 获得客户端信息88 */89 private static String getClientInfo(Socket socket) {90 InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();91 String ip = addr.getAddress().getHostAddress();92 String port = addr.getPort() + "";93 return "[" + ip + ":" + port + "] ";94 }95 }
2、客户端
1 /** 2 * 客户端 3 */ 4 public class MyClientSocketChannel { 5 public static void main(String[] args) throws Exception { 6 Selector sel = Selector.open(); //挑选器 7 SocketChannel sc = SocketChannel.open(); //开启通道 8 InetSocketAddress addr = new InetSocketAddress("localhost", 8888);//服务器地址 9 sc.connect(addr); //连接10 11 sc.configureBlocking(false); //*****非阻塞模式12 sc.register(sel, SelectionKey.OP_READ); //注册read事件13 14 new Sender(sc).start(); //开启线程发送消息15 16 //17 ByteBuffer buf = ByteBuffer.allocate(1024);18 //开始挑选19 while(true){20 sel.select();21 ByteArrayOutputStream baos = new ByteArrayOutputStream();22 while(sc.read(buf) != 0){23 buf.flip();24 baos.write(buf.array(),0,buf.limit());25 buf.clear();26 }27 String str = new String(baos.toByteArray());28 System.out.println(str);29 }30 }31 }
3、多线程分支读取数据的输入
1 /** 2 * 发送器线程 3 */ 4 public class Sender extends Thread{ 5 private SocketChannel sc ; 6 public Sender(SocketChannel sc){ 7 this.sc = sc ; 8 } 9 public void run() {10 try {11 //读取console内容,写入到sc12 BufferedReader br = new BufferedReader(13 new InputStreamReader(System.in));14 String line = null ;15 ByteBuffer buf = null ;16 while((line = br.readLine()) != null){17 buf = ByteBuffer.allocate(1024);18 buf.put(line.getBytes());19 buf.flip();20 sc.write(buf);21 buf.clear();22 }23 } catch (Exception e) {24 e.printStackTrace();25 }26 }27 }
2、
1 public class MyServer { 2 public static void main(String[] args) { 3 //创建挑选器 4 try { 5 Selector sel = Selector.open(); 6 //获取服务器通道 7 ServerSocketChannel ssc = ServerSocketChannel.open(); 8 //设置非阻塞模式 9 ssc.configureBlocking(false);10 //绑定地址11 ssc.socket().bind(new InetSocketAddress("localhost", 8888));12 13 //在挑选器中注册通道,指定感兴趣事件14 ssc.register(sel, SelectionKey.OP_ACCEPT);15 16 while(true){17 //不断地挑选18 sel.select();19 //被挑选出的key的集合20 Setkeys = sel.selectedKeys();21 Iterator it = keys.iterator();22 23 while(it.hasNext()){24 SelectionKey key = it.next();25 if(key.isAcceptable()){26 SocketChannel sc = ssc.accept();27 System.out.println("有人连进来了");28 sc.configureBlocking(false);29 sc.register(sel, SelectionKey.OP_READ|SelectionKey.OP_WRITE);30 System.out.println("有人注册了");31 }32 if(key.isReadable()){33 SocketChannel sc = (SocketChannel) key.channel();34 System.out.println(readStrFromChannel(sc));35 }36 if(key.isWritable()){37 ByteBuffer buf = ByteBuffer.allocate(1024);38 buf.put("hello world".getBytes());39 buf.flip();40 SocketChannel sc = (SocketChannel) key.channel();41 sc.write(buf);42 key.cancel();//取消连接43 }44 it.remove();45 }46 //keys.clear();47 }48 } catch (Exception e) {49 50 51 }52 }53 private static String readStrFromChannel(SocketChannel sc) {54 ByteArrayOutputStream bos = new ByteArrayOutputStream();55 ByteBuffer buffer = ByteBuffer.allocate(1024);56 try {57 while(sc.read(buffer)>0){58 buffer.flip();59 bos.write(buffer.array(), 0, buffer.limit());60 }61 return new String(bos.toByteArray());62 } catch (IOException e) {63 e.printStackTrace();64 }65 return null;66 }67 }
1 public class MyClient { 2 3 public static void main(String[] args) { 4 try { 5 Selector sel = Selector.open(); 6 SocketChannel sc = SocketChannel.open(); 7 sc.configureBlocking(false); 8 SelectionKey key = sc.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); 9 sc.connect(new InetSocketAddress("localhost", 8888));10 int i=0;11 while (true) {12 sel.select();13 // 挑选器中最多只有一个对象14 if (sel.selectedKeys().isEmpty()) {15 continue;16 }17 if (key.isConnectable()) {18 sc.finishConnect();19 }20 if (key.isReadable()) {21 String src=(readStrFromChannel(sc));22 if(src !=null){23 System.out.println(src);24 }25 }26 if(key.isWritable()){27 ByteBuffer buffer = ByteBuffer.allocate(11);28 buffer.put("hello wokd".getBytes());29 buffer.flip();//重新定位指针,用于写30 sc.write(buffer);31 }32 i++;33 }34 35 } catch (Exception e) {36 e.printStackTrace();37 }38 }39 40 private static String readStrFromChannel(SocketChannel sc) {41 ByteArrayOutputStream bos = new ByteArrayOutputStream();42 ByteBuffer buffer = ByteBuffer.allocate(20);43 try {44 if(sc.read(buffer)>0){45 buffer.flip();46 bos.write(buffer.array(), 0, buffer.limit());47 //buffer.clear();48 }49 return new String(bos.toByteArray());50 } catch (IOException e) {51 e.printStackTrace();52 }53 return null;54 }55 56 }