2019-06-13 13:42:47 274浏览
今天千锋扣丁学堂Java培训老师给大家分享一篇关于探索Java开发I/O模型演进的详细介绍,什么是同步?什么是异步?阻塞和非阻塞又有什么区别?本文先从Unix的I/O模型讲起,介绍了5种常见的I/O模型。而后再引出Java的I/O模型的演进过程,并用实例说明如何选择合适的JavaI/O模型来提高系统的并发量和可用性。
I/O多路复用会用到select或者poll函数,这两个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。
首先我们允许socket进行信号驱动I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个SIGIO信号,可以在信号处理函数中调用I/O操作函数处理数据。
调用aio_read函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,然后立即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。
同步I/O操作引起请求进程阻塞,直到I/O操作完成。异步I/O操作不引起请求进程阻塞。
public class EchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
try (
ServerSocket serverSocket =
new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen on port "
+ port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
public class MultiThreadEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// MultiThread
new Thread(new EchoServerHandler(clientSocket)).start();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
public class EchoServerHandler implements Runnable {
private Socket clientSocket;
public EchoServerHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
}
public class ThreadPoolEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// Thread Pool
threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
public class NonBlokingEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address);
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException ex) {
ex.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey clientKey = client.register(selector,
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
public class AsyncEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
// create asynchronous server socket channel bound to the default group
try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) {
if (asynchronousServerSocketChannel.isOpen()) {
// set some options
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// bind the server socket channel to local address
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
// display a waiting message while ... waiting clients
System.out.println("Waiting for connections ...");
while (true) {
Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
.accept();
try {
final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
.get();
Callable<String> worker = new Callable<String>() {
@Override
public String call() throws Exception {
String host = asynchronousSocketChannel.getRemoteAddress().toString();
System.out.println("Incoming connection from: " + host);
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// transmitting data
while (asynchronousSocketChannel.read(buffer).get() != -1) {
buffer.flip();
asynchronousSocketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
asynchronousSocketChannel.close();
System.out.println(host + " was successfully served!");
return host;
}
};
taskExecutor.submit(worker);
} catch (InterruptedException | ExecutionException ex) {
System.err.println(ex);
System.err.println("\n Server is shutting down ...");
// this will make the executor accept no new threads
// and finish all existing threads in the queue
taskExecutor.shutdown();
// wait until all threads are finished
while (!taskExecutor.isTerminated()) {
}
break;
}
}
} else {
System.out.println("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException ex) {
System.err.println(ex);
}
}
}
【关注微信公众号获取更多学习资料】 【扫码进入JavaEE/微服务VIP免费公开课】