2019-06-13 13:42:47 228浏览
今天千锋扣丁学堂Java培训老师给大家分享一篇关于探索Java开发I/O模型演进的详细介绍,什么是同步?什么是异步?阻塞和非阻塞又有什么区别?本文先从Unix的I/O模型讲起,介绍了5种常见的I/O模型。而后再引出Java的I/O模型的演进过程,并用实例说明如何选择合适的JavaI/O模型来提高系统的并发量和可用性。
I/O多路复用会用到select或者poll函数,这两个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。
首先我们允许socket进行信号驱动I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个SIGIO信号,可以在信号处理函数中调用I/O操作函数处理数据。
调用aio_read函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,然后立即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。
同步I/O操作引起请求进程阻塞,直到I/O操作完成。异步I/O操作不引起请求进程阻塞。
public class EchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } try ( ServerSocket serverSocket = new ServerSocket(port); Socket clientSocket = serverSocket.accept(); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader( new InputStreamReader(clientSocket.getInputStream())); ) { String inputLine; while ((inputLine = in.readLine()) != null) { out.println(inputLine); } } catch (IOException e) { System.out.println("Exception caught when trying to listen on port " + port + " or listening for a connection"); System.out.println(e.getMessage()); } } }
public class MultiThreadEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } Socket clientSocket = null; try (ServerSocket serverSocket = new ServerSocket(port);) { while (true) { clientSocket = serverSocket.accept(); // MultiThread new Thread(new EchoServerHandler(clientSocket)).start(); } } catch (IOException e) { System.out.println( "Exception caught when trying to listen on port " + port + " or listening for a connection"); System.out.println(e.getMessage()); } } }
public class EchoServerHandler implements Runnable { private Socket clientSocket; public EchoServerHandler(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { String inputLine; while ((inputLine = in.readLine()) != null) { out.println(inputLine); } } catch (IOException e) { System.out.println(e.getMessage()); } } }
public class ThreadPoolEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } ExecutorService threadPool = Executors.newFixedThreadPool(5); Socket clientSocket = null; try (ServerSocket serverSocket = new ServerSocket(port);) { while (true) { clientSocket = serverSocket.accept(); // Thread Pool threadPool.submit(new Thread(new EchoServerHandler(clientSocket))); } } catch (IOException e) { System.out.println( "Exception caught when trying to listen on port " + port + " or listening for a connection"); System.out.println(e.getMessage()); } } }
public class NonBlokingEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } System.out.println("Listening for connections on port " + port); ServerSocketChannel serverChannel; Selector selector; try { serverChannel = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(port); serverChannel.bind(address); serverChannel.configureBlocking(false); selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException ex) { ex.printStackTrace(); return; } while (true) { try { selector.select(); } catch (IOException ex) { ex.printStackTrace(); break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); SelectionKey clientKey = client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ); ByteBuffer buffer = ByteBuffer.allocate(100); clientKey.attach(buffer); } if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); client.read(output); } if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); output.flip(); client.write(output); output.compact(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { } } } } } }
public class AsyncEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory()); // create asynchronous server socket channel bound to the default group try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) { if (asynchronousServerSocketChannel.isOpen()) { // set some options asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024); asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); // bind the server socket channel to local address asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); // display a waiting message while ... waiting clients System.out.println("Waiting for connections ..."); while (true) { Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel .accept(); try { final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture .get(); Callable<String> worker = new Callable<String>() { @Override public String call() throws Exception { String host = asynchronousSocketChannel.getRemoteAddress().toString(); System.out.println("Incoming connection from: " + host); final ByteBuffer buffer = ByteBuffer.allocateDirect(1024); // transmitting data while (asynchronousSocketChannel.read(buffer).get() != -1) { buffer.flip(); asynchronousSocketChannel.write(buffer).get(); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } } asynchronousSocketChannel.close(); System.out.println(host + " was successfully served!"); return host; } }; taskExecutor.submit(worker); } catch (InterruptedException | ExecutionException ex) { System.err.println(ex); System.err.println("\n Server is shutting down ..."); // this will make the executor accept no new threads // and finish all existing threads in the queue taskExecutor.shutdown(); // wait until all threads are finished while (!taskExecutor.isTerminated()) { } break; } } } else { System.out.println("The asynchronous server-socket channel cannot be opened!"); } } catch (IOException ex) { System.err.println(ex); } } }
【关注微信公众号获取更多学习资料】 【扫码进入JavaEE/微服务VIP免费公开课】