`
kanpiaoxue
  • 浏览: 1749234 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

java net 编程(Socket,netty)

 
阅读更多

经常要用到网络编程。目前常用的服务器端有的Socket编程,还有NIO编程。

针对NIO编程,这里选择了一个成熟的框架Netty来编写服务器端。

而Socket的Server端,采用线程池来构造,能满足日常的一般需求。

 

========== netty

EchoServer.java

 

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

public class EchoServer {
	private static final Logger LOGGER = Logger.getLogger(EchoServer.class);

	private static final int PORT = 10007;
	private static final String FRAMER = "framer";
	private static final String DECODER = "decoder";
	private static final String ENCODER = "encoder";
	private static final String HANDLER = "handler";

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		LOGGER.info("start netty echo server.");
		ChannelFactory factory = new NioServerSocketChannelFactory(
				Executors.newCachedThreadPool(),
				Executors.newCachedThreadPool());

		ServerBootstrap bootstrap = new ServerBootstrap(factory);

		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() {
				ChannelPipeline pipeline = Channels.pipeline();
				pipeline.addLast(FRAMER, new DelimiterBasedFrameDecoder(
						Integer.MAX_VALUE, Delimiters.lineDelimiter()));
				pipeline.addLast(DECODER, new StringDecoder());
				pipeline.addLast(ENCODER, new StringEncoder());
				pipeline.addLast(HANDLER, new EchoServerHandler());
				return pipeline;
			}
		});

		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);

		bootstrap.bind(new InetSocketAddress(PORT));

	}

}

 

 

EchoServerHandler.java

 

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

import com.alibaba.fastjson.JSON;

public class EchoServerHandler extends SimpleChannelHandler {
	private static final Logger LOGGER = Logger.getLogger(EchoServer.class);

	private static final AtomicInteger COUNT = new AtomicInteger();

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
		String inputString = (String) e.getMessage();

		try {
			TransportMessage request = JSON.parseObject(inputString,
					TransportMessage.class);

			String response = null;
			if (request.getCode() % 2 == 0) {
				response = JSON.toJSONString(new TransportMessage(7100,
						"request.getState()%2 == 0" + request.getContent()));
			} else {
				response = JSON.toJSONString(new TransportMessage(8100, request
						.getContent()));
			}

			Channel channel = e.getChannel();

			byte[] arr = new StringBuilder(response).append("\tcount:")
					.append(COUNT.getAndIncrement()).append("\n").toString()
					.getBytes();
			ChannelBuffer word = ChannelBuffers.buffer(arr.length);
			word.writeBytes(arr);
			channel.write(word);
			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(Thread.currentThread().getName() + ":"
						+ e.getRemoteAddress() + " receives message : "
						+ inputString + " -- send message : " + response);
			}
			
			System.out.println(Thread.currentThread().getName() + ":"
						+ e.getRemoteAddress() + " receives message : "
						+ inputString + " -- send message : " + response);
			
		} catch (Exception e1) {
			LOGGER.error("Error:" + e.getMessage(), e1);
		}
	}

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		ctx.sendUpstream(e);
	}

	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		ctx.sendUpstream(e);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
		LOGGER.error("Error:" + e.getCause().getMessage(), e.getCause());
		Channel ch = e.getChannel();
		ch.close();
	}
}

 

TransportMessage.java

 

public class TransportMessage {
	private int code;
	private String content;

	public TransportMessage(int code, String content) {
		super();
		this.code = code;
		this.content = content;
	}

	public TransportMessage() {
		super();
	}

	public int getCode() {
		return code;
	}

	public void setCode(int code) {
		this.code = code;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	@Override
	public int hashCode() {
		final int prime = 37;
		int result = 17;
		result = prime * result + code;
		result = prime * result + ((content == null) ? 0 : content.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		TransportMessage other = (TransportMessage) obj;
		if (code != other.code)
			return false;
		if (content == null) {
			if (other.content != null)
				return false;
		} else if (!content.equals(other.content))
			return false;
		return true;
	}

	@Override
	public String toString() {
		return "TransportMessage [code=" + code + ", content=" + content + "]";
	}

}

 

 

======== socket server (with thread pool)

SocketServer.java

 

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class SocketServer implements Runnable {

	private final String name;
	private final ServerSocket server;

	private final BlockingQueue<Socket> queue;

	public SocketServer(String name, BlockingQueue<Socket> queue, int port)
			throws IOException {
		super();
		this.name = name;
		this.server = new ServerSocket(port);
		this.server.setReuseAddress(true);
		this.queue = queue;
		System.out.println(server + " start to run.");
	}

	@Override
	public void run() {
		Thread.currentThread().setName(name);
		while (true) {
			try {
				queue.put(server.accept());
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}

	}

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		int port = 7777;
		int threadPoolSize = Runtime.getRuntime().availableProcessors() * 6;
		BlockingQueue<Socket> queue = new LinkedBlockingQueue<Socket>();
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new SocketServer("producer", queue, port));
		for (int i = 0; i < threadPoolSize; i++) {
			exec.execute(new Consumer(queue, "consumer-" + i));
		}
		exec.shutdown();

	}

}

 

 

Consumer.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.fastjson.JSON;

public class Consumer implements Runnable {
	private static final AtomicInteger count = new AtomicInteger();
	private final BlockingQueue<Socket> queue;
	private final String name;

	public Consumer(BlockingQueue<Socket> queue, String name) {
		super();
		this.queue = queue;
		this.name = name;
	}

	public void run() {
		Thread.currentThread().setName(name);
		System.out.println(name + " start to run.");
		while (true) {
			try {
				consume(queue.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	private void consume(Socket socket) throws IOException {
		try {
			socket.setTcpNoDelay(true);

			BufferedReader br = getReader(socket);
			PrintWriter pw = getWriter(socket);
			for (String msg = br.readLine(); msg != null; msg = br.readLine()) {
				System.out.println(Thread.currentThread().getName()
						+ " receive msg : " + msg + " from "
						+ socket.getRemoteSocketAddress() + ":"
						+ socket.getPort());

				String response = echoMsg(msg);
				pw.println(response);
				pw.flush();
				System.out.println(Thread.currentThread().getName()
						+ " send msg: " + response);
				if (msg.equalsIgnoreCase("bye")) {
					break;
				}
			}
		} finally {
			if (null != socket) {
				try {
					socket.close();
					System.out.println(Thread.currentThread().getName()
							+ " closes " + socket);
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

	private PrintWriter getWriter(Socket socket) throws IOException {
		return new PrintWriter(socket.getOutputStream());
	}

	private BufferedReader getReader(Socket socket) throws IOException {
		return new BufferedReader(
				new InputStreamReader(socket.getInputStream()));
	}

	private String echoMsg(String request) {

		TransportMessage msg = JSON
				.parseObject(request, TransportMessage.class);

		return msg.getCode() + " --> world_" + count.getAndIncrement();
	}

}

 

=== socket client

SocketClient.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

import com.alibaba.fastjson.JSON;

public class SocketClient {

	
	private final Socket socket;

	public SocketClient(String host, int port) throws UnknownHostException,
			IOException {
		this.socket = new Socket(host, port);
		this.socket.setTcpNoDelay(true);
		this.socket.setTrafficClass(0x08 | 0x10);
		this.socket.setReuseAddress(true);
		
	}

	private PrintWriter getWriter(Socket socket) throws IOException {
		return new PrintWriter(socket.getOutputStream());
	}

	private BufferedReader getReader(Socket socket) throws IOException {
		return new BufferedReader(
				new InputStreamReader(socket.getInputStream()));
	}

	private void talk() {
		try {
			BufferedReader br = getReader(socket);
			PrintWriter pw = getWriter(socket);
			for (int i = 0; i < 100; i++) {
				TransportMessage tm = new TransportMessage(i, "Hello_" + i);
				String msg = JSON.toJSONString(tm);
				pw.println(msg);
				pw.flush();
				System.out.println("send msg : " + msg);

				System.out.println("receive msg : " + br.readLine());

			}

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (null != socket) {
				try {
					socket.close();
				} catch (IOException ex) {
					ex.printStackTrace();
				}
			}
		}

	}

	public static void main(String[] args) throws UnknownHostException,
			IOException {
		String host = "192.168.123.53";
		int port = 7777;
//		int port = 10007;

		for (int i = 0; i < 100000; i++) {
			new SocketClient(host, port).talk();
		}

	}
}

 

分享到:
评论

相关推荐

    java socket服务器与客户端的通信实现用户登录

    这个socket通信比较直观,比较容易看懂,实现了Java中的socket的通信问题。是Java网络编程的一个比较不错的例子!

    Netty的Socket编程详解-搭建服务端与客户端并进行数据传输示例代码.rar

    Netty的Socket编程详解-搭建服务端与客户端并进行数据传输示例代码.

    netty 2019最新源码与示例

    netty 2019 最新源码, 包含示例 Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发...Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

    Android-netty和socket通信的demo

    Netty是基于Java NIO client-server的网络应用框架,使用Netty可以快速开发网络应用

    Java视频教程 Java游戏服务器端开发 Netty NIO AIO Mina视频教程

    二、java NIO,AIO编程视频教程 1、java NIO,AIO编程_01.flv 2、java NIO,AIO编程_02.flv 3、java NIO,AIO编程_03.flv 4、java NIO,AIO编程_04.flv 5、java NIO,AIO编程_05.flv 三、Java语言基础教程-Java NIO...

    netty-demo实例

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括...

    netty-all-4.1.29.Final-sources.jar 最新版netty源码

    Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源...

    Netty 快速入门系列-源码

    Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 Netty是一个NIO客户端、服务端框架。允许快速简单的开发网络应用程序。...

    从NIO到Netty,编程实战出租车905协议-08172347.pdf

    第2章,介绍在Socket编程过程中一些基础知识,让大家建立起对这块知识内容的一个整体轮廓; 第3章,结合905.4-2014协议的基本内容,动手实现NIO长连接服务端的实现,以及协议内容的设计和实现思路; 第4章,实现长...

    Netty(Java 网络服务框架) v4.0.44.zip

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议...

    netty电子书

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括...

    netty4官方包

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括...

    netty-all-4.1.29.Final.jar最新版导入直接用

    Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源...

    netty in action (meap)

    Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的...Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

    Netty(Java 网络服务框架)v4.1.53.zip

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议...

    Netty3.1 中文用户手册

    Java网络编程框架,学写使用Netty开发Socket服务

    netty-demo

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括...

    netty-4.1.15.Final.tar.bz2-2017-8-25

    Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种...

    netty-4.1.49.Final.tar.bz2

    Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源...

Global site tag (gtag.js) - Google Analytics