[jboss-cvs] JBoss Messaging SVN: r4845 - in trunk/src/main/org/jboss/messaging/core/remoting/impl: netty and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 20 02:44:33 EDT 2008
Author: trustin
Date: 2008-08-20 02:44:33 -0400 (Wed, 20 Aug 2008)
New Revision: 4845
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
Log:
Fixed ClientCrashTest failures
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-08-20 06:44:33 UTC (rev 4845)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.remoting.impl.mina;
import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.logging.Logger;
@@ -68,21 +70,35 @@
return;
}
- session.close().awaitUninterruptibly();
+ SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
- SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
- if (sslFilter != null)
- {
- try
+ if (session.getService() instanceof IoConnector) {
+ if (sslFilter != null)
{
- sslFilter.stopSsl(session).awaitUninterruptibly();
+ try
+ {
+ sslFilter.stopSsl(session).awaitUninterruptibly();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
}
- catch (Throwable t)
+ session.close().awaitUninterruptibly();
+ } else {
+ if (sslFilter != null)
{
- // ignore
+ try
+ {
+ sslFilter.stopSsl(session).addListener(IoFutureListener.CLOSE);
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+ } else {
+ session.close();
}
-
-
}
closed = true;
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java 2008-08-20 06:44:33 UTC (rev 4845)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Common handler implementation for client and server side handler.
+ *
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ * @version $Rev$, $Date$
+ */
+class MessagingChannelHandler extends SimpleChannelHandler
+{
+ private static final Logger log = Logger.getLogger(MessagingChannelHandler.class);
+
+ private final RemotingHandler handler;
+ private final ConnectionLifeCycleListener listener;
+ volatile boolean destroyed;
+
+ MessagingChannelHandler(RemotingHandler handler, ConnectionLifeCycleListener listener)
+ {
+ this.handler = handler;
+ this.listener = listener;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+ {
+ synchronized (this)
+ {
+ if (!destroyed)
+ {
+ listener.connectionDestroyed(e.getChannel().getId());
+ destroyed = true;
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+ {
+ log.error(
+ "caught exception " + e.getCause() + " for channel " +
+ e.getChannel(), e.getCause());
+
+ synchronized (this)
+ {
+ if (destroyed)
+ {
+ return;
+ }
+
+ MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
+ me.initCause(e.getCause());
+ try {
+ listener.connectionException(e.getChannel().getId(), me);
+ destroyed = true;
+ } catch (Exception ex) {
+ log.error("failed to notify the listener:", ex);
+ }
+ }
+ }
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java 2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java 2008-08-20 06:44:33 UTC (rev 4845)
@@ -22,7 +22,7 @@
package org.jboss.messaging.core.remoting.impl.netty;
-import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
@@ -31,15 +31,12 @@
import javax.net.ssl.SSLContext;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.RemotingHandler;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
import org.jboss.messaging.core.remoting.spi.Acceptor;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -47,9 +44,6 @@
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
@@ -63,8 +57,6 @@
*/
public class NettyAcceptor implements Acceptor
{
- private static final Logger log = Logger.getLogger(NettyAcceptor.class);
-
private ExecutorService bossExecutor;
private ExecutorService workerExecutor;
private ChannelFactory channelFactory;
@@ -128,7 +120,7 @@
ChannelPipelineSupport.addSSLFilter(pipeline, context, false);
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new NettyHandler());
+ pipeline.addLast("handler", new MessagingServerChannelHandler(handler, listener));
return pipeline;
}
});
@@ -169,49 +161,18 @@
// Inner classes -----------------------------------------------------------------------------
@ChannelPipelineCoverage("one")
- private final class NettyHandler extends SimpleChannelHandler
+ private final class MessagingServerChannelHandler extends MessagingChannelHandler
{
+ MessagingServerChannelHandler(RemotingHandler handler, ConnectionLifeCycleListener listener)
+ {
+ super(handler, listener);
+ }
+
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
Connection tc = new NettyConnection(e.getChannel());
listener.connectionCreated(tc);
}
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
- {
- boolean notify;
- synchronized (NettyAcceptor.this)
- {
- notify = channelFactory == null;
- }
-
- if (notify) {
- listener.connectionDestroyed(e.getChannel().getId());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
- {
- log.error(
- "caught exception " + e.getCause() + " for channel " +
- e.getChannel(), e.getCause());
- MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
- me.initCause(e.getCause());
- try {
- listener.connectionException(e.getChannel().getId(), me);
- } catch (Exception ex) {
- log.error("failed to notify the listener:", ex);
- }
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
- {
- ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
- }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java 2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java 2008-08-20 06:44:33 UTC (rev 4845)
@@ -80,6 +80,12 @@
channel.close();
}
+ // TODO Do not spin - use signal.
+ MessagingChannelHandler handler = (MessagingChannelHandler) channel.getPipeline().get("handler");
+ while (!handler.destroyed) {
+ Thread.yield();
+ }
+
closed = true;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java 2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java 2008-08-20 06:44:33 UTC (rev 4845)
@@ -21,7 +21,7 @@
*/
package org.jboss.messaging.core.remoting.impl.netty;
-import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
@@ -31,7 +31,6 @@
import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.RemotingHandler;
@@ -39,7 +38,6 @@
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.Connector;
import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -47,9 +45,6 @@
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -166,7 +161,7 @@
ChannelPipelineSupport.addSSLFilter(pipeline, context, true);
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new NettyHandler());
+ pipeline.addLast("handler", new MessagingClientChannelHandler(handler, listener));
return pipeline;
}
});
@@ -212,13 +207,11 @@
// Inner classes -------------------------------------------------
@ChannelPipelineCoverage("one")
- private final class NettyHandler extends SimpleChannelHandler
+ private final class MessagingClientChannelHandler extends MessagingChannelHandler
{
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ MessagingClientChannelHandler(RemotingHandler handler, ConnectionLifeCycleListener listener)
{
- ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ super(handler, listener);
}
@Override
@@ -230,27 +223,5 @@
sslHandler.handshake(e.getChannel());
}
}
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
- {
- listener.connectionDestroyed(e.getChannel().getId());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
- {
- log.error(
- "caught exception " + e.getCause() + " for channel " +
- e.getChannel(), e.getCause());
-
- MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
- me.initCause(e.getCause());
- try {
- listener.connectionException(e.getChannel().getId(), me);
- } catch (Exception ex) {
- log.error("failed to notify the listener:", ex);
- }
- }
}
}
More information about the jboss-cvs-commits
mailing list