My netty program running on Linux has strange question

Christian Migowski chrismfwrd at gmail.com
Thu Apr 29 02:53:19 EDT 2010


Hello,

I am still not sure if I understood your problem, here are a few hints:

Your Amf3ServerPipelineFactory just creates and adds
Amf3Client/Amf3Server to the pipeline so there is no delay possible
due to your own code, right?
Have you tried increasing the backlog on the server? in your code
_options.put("backlog", 200); (I have no idea what the Netty default
value is)
What is the value of the Linux tcp syn backlog, maybe this is
configured too small (you can find it out with "cat
/proc/sys/net/ipv4/tcp_max_syn_backlog")?

regards,
christian!


On Thu, Apr 29, 2010 at 6:05 AM, mumuyu99 <99yangyiqiang99 at 163.com> wrote:
>
> hi
>   I'm sorry ,my english is so pool!
>
>   My question is :
>
>   When Server run on Linux system,accept 200 client socket connect at the
> same time,
> it will pause a  moment when each accept 20-30 connect,the pause is a short
> time!
>
>  But this problem no happen in Windows system !
>
>  my client codes:
>
>
> public abstract class Amf3Client extends SimpleChannelHandler {
>
>        private static final Logger logger = LoggerFactory
>                        .getLogger(Amf3Client.class);
>
>        protected String _host;
>
>        protected int _port;
>
>        protected ClientBootstrap _bootstrap;
>
>        protected NioClientSocketChannelFactory _factory;
>
>        protected Channel _channel;
>
>        protected int _workerCount;
>
>        protected CallPool _callPool;
>
>        protected Amf3Request _request;
>
>        private void init() {
>                _factory = new NioClientSocketChannelFactory(Executors
>                                .newCachedThreadPool(), Executors.newCachedThreadPool());
>                _bootstrap = new ClientBootstrap(_factory);
>                _bootstrap.setPipelineFactory(new Amf3ServerPipelineFactory(this));
>                HashMap<String, Object> options = new HashMap<String, Object>();
>                options.put("tcpNoDelay", true);
>                options.put("keepAlive", true);
>                _bootstrap.setOptions(options);
>                _request = new Amf3Request();
>                _callPool = new CallPool();
>                initCallPool();
>        }
>
>        protected abstract void initCallPool();
>
>        public Amf3Client(String host, int port) {
>                _host = host;
>                _port = port;
>                init();
>        }
>
>        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
>                        throws Exception {
>                super.handleUpstream(ctx, e);
>        }
>
>        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
>        }
>
>        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
>                        throws Exception {
>
>        }
>
>        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
>                if (e.getCause() instanceof ReadTimeoutException) {
>                        System.out.println("read time out.id=" + e.getChannel().getId());
>                } else if (e.getCause() instanceof ClosedChannelException) {
>                        System.out.println("ClosedChannel.id=" + e.getChannel().getId());
>                } else if (e.getCause() instanceof IOException) {
>                        System.out.println("IOException.id=" + e.getChannel().getId());
>                } else {
>                        System.out.println("other.id=" + e.getChannel().getId());
>                        System.out.println(e.getCause().toString());
>                }
>        }
>
>        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
>                _request.parseAS(e.getMessage());
>                _callPool.execute(_request);
>        }
>
>        public void connect() {
>                ChannelFuture future = _bootstrap.connect(new InetSocketAddress(_host,
>                                _port));
>                _channel = future.awaitUninterruptibly().getChannel();
>                if (!future.isSuccess()) {
>                        logger.error("error creating client connection: {}", future
>                                        .getCause().getMessage());
>                } else {
>                        logger.info("connect success.id={}", _channel.getId());
>                }
>        }
>
>        public void disconnect() {
>                _channel.close().awaitUninterruptibly();
>        }
>
>        public void send(Amf3Request request) {
>                if (_channel.isConnected()) {
>                        _channel.write(request.toData());
>                }
>        }
> }
>
> Test codes:
>
> public abstract class ARobot extends Amf3Client {
>
>        protected abstract void think();
>
>        public ARobot(String host, int port) {
>                super(host, port);
>        }
> }
>
>
>
> public class LoginRobot extends ARobot {
>
>        private static Logger _logger = LoggerFactory.getLogger(LoginRobot.class);
>
>        protected String _username;
>
>        protected String _pwd;
>
>        protected UserData _userData;
>
>        @Override
>        protected void initCallPool() {
>                _callPool.addCallback(this, "call_login");
>                _callPool.addCallback(this, "call_regUser");
>        }
>
>        protected void login() {
>                send(new Amf3Request("login", _username, _pwd));
>        }
>
>        @Override
>        public void think() {
>                connect();
>                login();
>        }
>
>        public LoginRobot(String host, int port, String username, String pwd) {
>                super(host, port);
>                _username = username;
>                _pwd = pwd;
>                _userData = new UserData();
>        }
>
>        public void call_login(ASObject obj) {
>                _userData.parse(obj);
>                if (_userData.getCode() == InfoCode.LOGIN_INVALID_USERNAME) {
>                        _logger.debug("reg");
>                        send(new Amf3Request("regUser", _username, _pwd, "a at b.c"));
>                } else {
>                        _logger.debug("loginSuccess.username={}", _username);
>                }
>        }
>
>        public void call_regUser(int code) {
>                if (code == InfoCode.NONE) {
>                        send(new Amf3Request("login", _username, _pwd));
>                }
>        }
> }
>
>
>
>
>
> My server codes
>
>
> public abstract class Amf3Server extends SimpleChannelHandler {
>
>        private static final Logger _logger = LoggerFactory
>                        .getLogger(Amf3Server.class);
>
>        private ServerBootstrap _bootstrap;
>
>        private SocketData _data;
>
>        private HashMap<String, Object> _options;
>
>        private int _workerCount;
>
>        protected CallPool _callPool;
>
>        protected AtomicBoolean _isStart;
>
>        private void init() {
>                _workerCount = (Runtime.getRuntime().availableProcessors() * 2 + 1);
>                _options = new HashMap<String, Object>();
>                _options.put("child.tcpNoDelay", true);
>                _options.put("child.keepAlive", true);
>                _options.put("child.connectTimeoutMillis", 50000);
>                _callPool = new CallPool();
>                _isStart = new AtomicBoolean(false);
>                initServer();
>                onInit();
>        }
>
>        private void initServer() {
>                NioServerSocketChannelFactory factory = new NioServerSocketChannelFactory(
>                                Executors.newCachedThreadPool(), Executors
>                                                .newCachedThreadPool(), _workerCount);
>                _bootstrap = new ServerBootstrap(factory);
>                _bootstrap.setOptions(_options);
>                _bootstrap.setPipelineFactory(new Amf3ServerPipelineFactory(this));
>        }
>
>        protected boolean startServer() {
>                try {
>                        _bootstrap.bind(new InetSocketAddress(_data.getPort())).isBound();
>                        _logger.info("start amf3 server done.");
>                        return true;
>                } catch (ChannelException e) {
>                        _logger.error("server start error={}", e.getMessage());
>                        return false;
>                }
>        }
>
>        protected void stopServer() {
>                _bootstrap.getFactory().releaseExternalResources();
>        }
>
>        protected abstract void onInit();
>
>        protected abstract void onStart();
>
>        protected abstract void onStop();
>
>        public Amf3Server(SocketData data) {
>                _data = data;
>                init();
>        }
>
>        public void start() {
>                if (_isStart.get()) {
>                        return;
>                }
>                if (startServer()) {
>                        onStart();
>                        _isStart.set(true);
>                        _logger.info("server start done.");
>                }
>        }
>
>        public void stop() {
>                if (!_isStart.get()) {
>                        return;
>                }
>                _isStart.set(false);
>                stopServer();
>                onStop();
>        }
> }
>
>
>
>
> when i run 200 clients such as :
> for (int i = 0; i < 500; i++) {
>        new LoginRobot("210.51.56.83", 80, "robot_" + i, "pwd_" + i).think();
>  }
>
>
> The problem turn up:
>
>
>
>
>
>
> --
> View this message in context: http://netty-forums-and-mailing-lists.685743.n2.nabble.com/My-netty-program-running-on-Linux-has-strange-question-tp4973543p4978240.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>



More information about the netty-users mailing list