[jboss-cvs] JBoss Messaging SVN: r4807 - in trunk: src/main/org/jboss/messaging/core/remoting and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 18 02:38:52 EDT 2008


Author: trustin
Date: 2008-08-18 02:38:52 -0400 (Mon, 18 Aug 2008)
New Revision: 4807

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java
Log:
* Added Netty transport and its related unit tests
* Changed connectionID type from Long to Object because Netty uses UUID as a connectionID

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.client.impl;
 
@@ -46,7 +46,7 @@
    public ClientConsumerPacketHandler(final ClientConsumerInternal clientConsumer, final long consumerID)
    {
       this.clientConsumer = clientConsumer;
-      
+
       this.consumerID = consumerID;
    }
 
@@ -55,14 +55,14 @@
       return consumerID;
    }
 
-   public void handle(final long sessionID, final Packet packet)
+   public void handle(final Object connectionID, final Packet packet)
    {
       byte type = packet.getType();
-      
+
       if (type == PacketImpl.RECEIVE_MSG)
       {
          ReceiveMessage message = (ReceiveMessage) packet;
-         
+
          try
          {
             clientConsumer.handleMessage(message.getClientMessage());
@@ -83,16 +83,17 @@
    {
       return "ClientConsumerPacketHandler[id=" + consumerID + "]";
    }
-   
+
+   @Override
    public boolean equals(Object other)
    {
       if (other instanceof ClientConsumerPacketHandler == false)
       {
          return false;
       }
-            
+
       ClientConsumerPacketHandler r = (ClientConsumerPacketHandler)other;
-      
-      return r.consumerID == this.consumerID;     
+
+      return r.consumerID == consumerID;
    }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.client.impl;
 
@@ -29,9 +29,9 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
 
 /**
- * 
+ *
  * A ClientProducerPacketHandler
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -46,7 +46,7 @@
    public ClientProducerPacketHandler(final ClientProducerInternal clientProducer, final long producerID)
    {
       this.clientProducer = clientProducer;
-      
+
       this.producerID = producerID;
    }
 
@@ -55,14 +55,14 @@
       return producerID;
    }
 
-   public void handle(final long sessionID, final Packet packet)
-   {    
+   public void handle(final Object connectionID, final Packet packet)
+   {
       byte type = packet.getType();
-      
+
       if (type == PacketImpl.PROD_RECEIVETOKENS)
       {
          ProducerFlowCreditMessage message = (ProducerFlowCreditMessage) packet;
-         
+
          try
          {
             clientProducer.receiveCredits(message.getTokens());
@@ -75,7 +75,7 @@
       else
       {
       	throw new IllegalStateException("Invalid packet: " + type);
-      }      
+      }
    }
 
    @Override
@@ -83,16 +83,17 @@
    {
       return "ClientProducerPacketHandler[id=" + producerID + "]";
    }
-   
+
+   @Override
    public boolean equals(Object other)
    {
       if (other instanceof ClientProducerPacketHandler == false)
       {
          return false;
       }
-            
+
       ClientProducerPacketHandler r = (ClientProducerPacketHandler)other;
-      
-      return r.producerID == this.producerID;     
+
+      return r.producerID == producerID;
    }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -25,17 +25,17 @@
 import org.jboss.messaging.core.remoting.spi.Connection;
 
 /**
- * 
+ *
  * A ConnectionLifeCycleListener
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public interface ConnectionLifeCycleListener
 {
    void connectionCreated(Connection connection);
-   
-   void connectionDestroyed(long connectionID);
-   
-   void connectionException(long connectionID, MessagingException me);
+
+   void connectionDestroyed(Object connectionID);
+
+   void connectionException(Object connectionID, MessagingException me);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting;
 
@@ -34,13 +34,13 @@
 
    void unregister(long handlerID);
 
-   void dispatch(long connectionID, Packet packet) throws Exception;
+   void dispatch(Object connectionID, Packet packet) throws Exception;
 
    long generateID();
-   
+
    void addInterceptor(Interceptor filter);
 
    void removeInterceptor(Interceptor filter);
-   
+
    List<Interceptor> getInterceptors();
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting;
 
@@ -26,20 +26,20 @@
 /**
  * A PacketHandler handles packets (as defined by {@link Packet} and its
  * subclasses).
- * 
+ *
  * It must have an ID unique among all PacketHandlers (or at least among those
  * registered into the same RemoteDispatcher).
- * 
+ *
  * @see PacketDispatcher#register(PacketHandler)
  * @see PacketDispatcher#unregister(long)
- * 
+ *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
+ *
  * @version <tt>$Revision$</tt>
  */
 public interface PacketHandler
 {
    long getID();
 
-   void handle(long connectionID, Packet packet);
+   void handle(Object connectionID, Packet packet);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting;
 
@@ -26,36 +26,36 @@
 import org.jboss.messaging.core.exception.MessagingException;
 
 /**
- * 
+ *
  * A RemotingConnection
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  *
  */
 public interface RemotingConnection
 {
-   long getID();
-      
+   Object getID();
+
    Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
-   
+
    void sendOneWay(long targetID, long executorID, Packet packet);
-   
+
    Packet sendBlocking(Packet packet) throws MessagingException;
-   
+
    void sendOneWay(Packet packet);
-   
+
    void addFailureListener(FailureListener listener);
-   
+
    boolean removeFailureListener(FailureListener listener);
-   
+
    PacketDispatcher getPacketDispatcher();
-   
+
    Location getLocation();
-   
+
    MessagingBuffer createBuffer(int size);
-   
+
    void fail(MessagingException me);
-   
+
    void destroy();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,27 +18,27 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 package org.jboss.messaging.core.remoting;
 
 import java.util.Set;
 
 /**
- * 
+ *
  * A RemotingHandler
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public interface RemotingHandler
 {
-   void bufferReceived(long connectionID, MessagingBuffer buffer) throws Exception;
-   
+   void bufferReceived(Object connectionID, MessagingBuffer buffer) throws Exception;
+
    int isReadyToHandle(MessagingBuffer buffer);
-   
+
    void closeExecutor(long executorID);
-   
-   Set<Long> scanForFailedConnections(long expirePeriod);
-   
-   void removeLastPing(long connectionID);   
+
+   Set<Object> scanForFailedConnections(long expirePeriod);
+
+   void removeLastPing(Object connectionID);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting;
 
@@ -37,11 +37,11 @@
 {
    PacketDispatcher getDispatcher();
 
-   RemotingConnection getConnection(long remotingConnectionID);  
-   
+   RemotingConnection getConnection(Object remotingConnectionID);
+
    Set<RemotingConnection> getConnections();
-   
+
    void registerAcceptorFactory(AcceptorFactory factory);
-   
+
    void unregisterAcceptorFactory(AcceptorFactory factory);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl;
 
@@ -36,9 +36,9 @@
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.RemotingHandler;
 import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.Connector;
 import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
-import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.util.JBMThreadFactory;
 
 /**
@@ -49,23 +49,23 @@
 public class ConnectionRegistryImpl implements ConnectionRegistry, ConnectionLifeCycleListener
 {
    // Constants -----------------------------------------------------
-     
+
    public static final Logger log = Logger.getLogger(ConnectionRegistryImpl.class);
-   
 
+
    // Attributes ----------------------------------------------------
 
-   private Map<String, ConnectionHolder> connections = new HashMap<String, ConnectionHolder>();
-         
-   private Map<TransportType, ConnectorFactory> connectorFactories = new HashMap<TransportType, ConnectorFactory>();
-   
-   private Map<Long, RemotingConnection> remotingConnections = new HashMap<Long, RemotingConnection>();
-   
+   private final Map<String, ConnectionHolder> connections = new HashMap<String, ConnectionHolder>();
+
+   private final Map<TransportType, ConnectorFactory> connectorFactories = new HashMap<TransportType, ConnectorFactory>();
+
+   private final Map<Object, RemotingConnection> remotingConnections = new HashMap<Object, RemotingConnection>();
+
    //TODO - core pool size should be configurable
-   private ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(20, new JBMThreadFactory("jbm-pinger-threads"));
+   private final ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(20, new JBMThreadFactory("jbm-pinger-threads"));
 
    // Static --------------------------------------------------------
-   
+
    // ConnectionRegistry implementation -----------------------------
 
    public synchronized RemotingConnection getConnection(final Location location, final ConnectionParams connectionParams)
@@ -73,11 +73,11 @@
       String key = location.getLocation();
 
       ConnectionHolder holder = connections.get(key);
-      
+
       if (holder != null)
-      {         
+      {
          holder.increment();
-         
+
          RemotingConnection connection = holder.getConnection();
 
          if (log.isDebugEnabled())
@@ -88,22 +88,22 @@
          return connection;
       }
       else
-      {                        
+      {
          PacketDispatcher dispatcher = new PacketDispatcherImpl(null);
-         
+
          RemotingHandler handler = new RemotingHandlerImpl(dispatcher, null);
-         
+
          Connector connector = createConnector(location, connectionParams, handler, this);
-         
+
          connector.start();
-         
+
          Connection tc = connector.createConnection();
-         
+
          if (tc == null)
          {
             throw new IllegalStateException("Failed to connect to " + location);
          }
-         
+
          long pingInterval = connectionParams.getPingInterval();
          RemotingConnection connection;
          if (pingInterval != -1)
@@ -118,26 +118,26 @@
          }
 
          remotingConnections.put(tc.getID(), connection);
-         
+
          if (log.isDebugEnabled())
          {
             log.debug("Created " + connector + " to connect to "  + location);
          }
-   
+
          holder = new ConnectionHolder(connection, connector);
-         
+
          connections.put(key, holder);
-         
+
          return connection;
       }
    }
-   
+
    public synchronized void returnConnection(final Location location)
    {
       String key = location.getLocation();
 
       ConnectionHolder holder = connections.get(key);
-      
+
       if (holder == null)
       {
          throw new IllegalStateException("No connection for location " + key);
@@ -149,13 +149,13 @@
          {
             log.debug("Removed connection for " + key);
          }
-                     
+
          RemotingConnection conn = remotingConnections.remove(holder.getConnection().getID());
-         
+
          conn.destroy();
-         
+
          holder.getConnector().close();
-         
+
          connections.remove(key);
       }
       else
@@ -164,15 +164,15 @@
          if (log.isDebugEnabled())
          {
             log.debug(holder.getCount() + " remaining references to " + key);
-         }         
+         }
       }
    }
 
    public synchronized int size()
    {
-      return this.connections.size();
+      return connections.size();
    }
-   
+
    public synchronized void registerConnectorFactory(final TransportType transport, final ConnectorFactory factory)
    {
       connectorFactories.put(transport, factory);
@@ -182,7 +182,7 @@
    {
       connectorFactories.remove(transport);
    }
-   
+
    public synchronized ConnectorFactory getConnectorFactory(final TransportType transport)
    {
       return connectorFactories.get(transport);
@@ -191,51 +191,51 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-         
+
    // ConnectionLifeCycleListener implementation --------------------
-   
+
    public void connectionCreated(final Connection connection)
-   {      
+   {
    }
 
-   public void connectionDestroyed(final long connectionID)
-   {      
+   public void connectionDestroyed(final Object connectionID)
+   {
       RemotingConnection conn = remotingConnections.remove(connectionID);
-      
+
       if (conn != null)
       {
          ConnectionHolder holder = connections.remove(conn.getLocation().getLocation());
-         
+
          //If conn still exists here this means that the underlying transport connection has been closed from the server side without
          //being returned from the client side so we need to fail the connection and call it's listeners
          MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED,
                                                         "The connection has been closed.");
          conn.fail(me);
-         
-         holder.getConnector().close();         
+
+         holder.getConnector().close();
       }
    }
 
-   public void connectionException(final long connectionID, final MessagingException me)
+   public void connectionException(final Object connectionID, final MessagingException me)
    {
       RemotingConnection conn = remotingConnections.remove(connectionID);
-      
+
       if (conn == null)
       {
          throw new IllegalStateException("Cannot find connection with id " + connectionID);
       }
-      
+
       ConnectionHolder holder = connections.remove(conn.getLocation().getLocation());
-      
+
       conn.fail(me);
-      
+
       holder.getConnector().close();
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-     
+
    // Private -------------------------------------------------------
 
    private Connector createConnector(final Location location,
@@ -244,7 +244,7 @@
          final ConnectionLifeCycleListener listener)
    {
       ConnectorFactory factory = connectorFactories.get(location.getTransport());
-      
+
       if (factory == null)
       {
          throw new IllegalStateException("No connector factory registered for transport " + location.getTransport());
@@ -258,9 +258,9 @@
    private static class ConnectionHolder
    {
       private final RemotingConnection connection;
-      
+
       private final Connector connector;
-      
+
       private int count;
 
       public ConnectionHolder(final RemotingConnection connection, final Connector connector)
@@ -269,7 +269,7 @@
 
          this.connection = connection;
          this.connector = connector;
-         this.count = 1;
+         count = 1;
       }
 
       public void increment()
@@ -291,7 +291,7 @@
       {
          return connection;
       }
-      
+
       public Connector getConnector()
       {
          return connector;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl;
 
@@ -56,7 +56,7 @@
 
    private final AtomicLong idSequence = new AtomicLong(0);
 
-   private List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+   private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
 
    // Static --------------------------------------------------------
 
@@ -67,7 +67,7 @@
       handlers = new ConcurrentHashMap<Long, PacketHandler>();
       if (filters != null)
       {
-         this.interceptors.addAll(filters);
+         interceptors.addAll(filters);
       }
    }
 
@@ -115,21 +115,21 @@
    {
       interceptors.remove(filter);
    }
-   
+
    public List<Interceptor> getInterceptors()
    {
       return new ArrayList<Interceptor>(interceptors);
    }
 
-   public void dispatch(final long connectionID, final Packet packet) throws Exception
+   public void dispatch(final Object connectionID, final Packet packet) throws Exception
    {
       long targetID = packet.getTargetID();
-      
+
       PacketHandler handler = getHandler(targetID);
-      
+
       if (handler != null)
       {
-         if (callInterceptors(packet))                  
+         if (callInterceptors(packet))
          {
             handler.handle(connectionID, packet);
          }
@@ -161,7 +161,7 @@
             {
                log.warn("Failed in calling interceptor: " + interceptor, e);
             }
-         }         
+         }
       }
       return true;
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -127,7 +127,7 @@
    // RemotingConnection implementation
    // ------------------------------------------------------------
 
-   public long getID()
+   public Object getID()
    {
       return transportConnection.getID();
    }
@@ -240,9 +240,9 @@
    }
 
    public synchronized void fail(final MessagingException me)
-   {  
+   {
       log.warn(me.getMessage());
-      
+
       destroy();
 
       // Then call the listeners
@@ -258,7 +258,7 @@
             // executing
             log.error("Failed to execute failure listener", t);
          }
-      }      
+      }
    }
 
    public synchronized void destroy()
@@ -292,22 +292,22 @@
 
    // Private
    // --------------------------------------------------------------------------------------
-      
+
    private void doWrite(final Packet packet)
    {
       if (destroyed)
       {
          throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
       }
-      
+
       MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
 
       packet.encode(buffer);
 
       transportConnection.write(buffer);
    }
-  
 
+
    // Inner classes
    // --------------------------------------------------------------------------------
 
@@ -323,7 +323,7 @@
 
             fail(me);
          }
-     
+
          gotPong = false;
          firstTime = false;
 
@@ -351,9 +351,9 @@
          return id;
       }
 
-      public void handle(long connectionID, Packet packet)
+      public void handle(Object connectionID, Packet packet)
       {
-         gotPong = true;         
+         gotPong = true;
       }
 
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,17 +18,11 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 package org.jboss.messaging.core.remoting.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATECONNECTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATECONNECTION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
+import static org.jboss.messaging.util.DataConstants.*;
 
 import java.util.HashSet;
 import java.util.Map;
@@ -87,33 +81,33 @@
 import org.jboss.messaging.util.OrderedExecutorFactory;
 
 /**
- * 
+ *
  * A RemotingHandlerImpl
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public class RemotingHandlerImpl implements RemotingHandler
-{   
+{
    private static final Logger log = Logger.getLogger(RemotingHandlerImpl.class);
-   
+
    private final PacketDispatcher dispatcher;
 
    private final ExecutorFactory executorFactory;
 
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-   
-   private final ConcurrentMap<Long, Long> lastPings = new ConcurrentHashMap<Long, Long>();
 
-   public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService)                              
+   private final ConcurrentMap<Object, Long> lastPings = new ConcurrentHashMap<Object, Long>();
+
+   public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService)
    {
       if (dispatcher == null)
       {
          throw new IllegalArgumentException ("argument dispatcher can't be null");
       }
-      
+
       this.dispatcher = dispatcher;
-      
+
       if (executorService != null)
       {
          executorFactory = new OrderedExecutorFactory(executorService);
@@ -123,30 +117,30 @@
          executorFactory = null;
       }
    }
-   
-   public Set<Long> scanForFailedConnections(final long expirePeriod)
+
+   public Set<Object> scanForFailedConnections(final long expirePeriod)
    {
       long now = System.currentTimeMillis();
-      
-      Set<Long> failedIDs = new HashSet<Long>();
-      
-      for (Map.Entry<Long, Long> entry: lastPings.entrySet())
+
+      Set<Object> failedIDs = new HashSet<Object>();
+
+      for (Map.Entry<Object, Long> entry: lastPings.entrySet())
       {
          long lastPing = entry.getValue();
-         
+
          if (now - lastPing > expirePeriod)
          {
             failedIDs.add(entry.getKey());
          }
       }
-      
+
       return failedIDs;
    }
-   
-   public void bufferReceived(final long connectionID, final MessagingBuffer buffer) throws Exception
+
+   public void bufferReceived(final Object connectionID, final MessagingBuffer buffer) throws Exception
    {
       final Packet packet = decode(connectionID, buffer);
-               
+
       if (executorFactory != null)
       {
          long executorID = packet.getExecutorID();
@@ -183,9 +177,9 @@
       else
       {
          dispatcher.dispatch(connectionID, packet);
-      }      
+      }
    }
-   
+
    public int isReadyToHandle(final MessagingBuffer buffer)
    {
       if (buffer.remaining() <= SIZE_INT)
@@ -194,15 +188,15 @@
       }
 
       int length = buffer.getInt();
-      
+
       if (buffer.remaining() < length)
       {
          return -1;
       }
-      
+
       return length;
    }
-   
+
    public void closeExecutor(final long executorID)
    {
       if (executors != null)
@@ -210,28 +204,28 @@
          executors.remove(executorID);
       }
    }
-   
-   public void removeLastPing(final long connectionID)
+
+   public void removeLastPing(final Object connectionID)
    {
       lastPings.remove(connectionID);
    }
-   
+
    // Public ------------------------------------------------------------------------------
-   
+
    public int getNumExecutors()
    {
       return executors.size();
    }
-   
-   public Packet decode(final long connectionID, final MessagingBuffer in) throws Exception
-   {     
+
+   public Packet decode(final Object connectionID, final MessagingBuffer in) throws Exception
+   {
       byte packetType = in.getByte();
-      
+
       Packet packet;
 
       switch (packetType)
       {
-         case NULL:
+         case PacketImpl.NULL:
          {
             packet = new PacketImpl(PacketImpl.NULL);
             break;
@@ -509,12 +503,12 @@
       }
 
       packet.decode(in);
-      
+
       return packet;
 
-   }   
-   
+   }
+
    // Private -----------------------------------------------------------------------------
-   
-   
+
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,11 +18,11 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl;
 
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.*;
 
 import java.util.HashSet;
 import java.util.Map;
@@ -51,13 +51,13 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *  
+ *
  * @version <tt>$Revision$</tt>
  */
 public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
 {
    // Constants -----------------------------------------------------
-  
+
    private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
 
    // Attributes ----------------------------------------------------
@@ -71,37 +71,37 @@
    private final PacketDispatcher dispatcher;
 
    private final ExecutorService remotingExecutor;
-   
+
    private RemotingHandler handler;
-   
+
    private final long connectionExpirePeriod;
-   
-   private final Map<Long, RemotingConnection> connections = new ConcurrentHashMap<Long, RemotingConnection>();
-   
+
+   private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
+
    private final Set<AcceptorFactory> acceptorFactories = new HashSet<AcceptorFactory>();
-   
+
    private final Timer failedConnectionTimer = new Timer(true);
-   
+
    private TimerTask failedConnectionsTask;
-      
+
    // Static --------------------------------------------------------
-   
+
    // Constructors --------------------------------------------------
-      
+
    public RemotingServiceImpl(final Configuration config)
    {
       validate(config);
 
       this.config = config;
-      
+
       dispatcher = new PacketDispatcherImpl(null);
 
       remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
-      
+
       handler = new RemotingHandlerImpl(dispatcher, remotingExecutor);
-      
+
       long pingPeriod = config.getConnectionParams().getPingInterval();
-      
+
       if (pingPeriod != -1)
       {
          connectionExpirePeriod = (long)(1.5 * pingPeriod);
@@ -109,8 +109,8 @@
       else
       {
          connectionExpirePeriod = -1;
-      }         
-      
+      }
+
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       for (String interceptorClass : config.getInterceptorClassNames())
       {
@@ -124,12 +124,12 @@
             log.warn("Error instantiating interceptor \"" + interceptorClass + "\"", e);
          }
       }
-      
+
       for (String factoryClass: config.getAcceptorFactoryClassNames())
       {
          try
          {
-            Class<?> clazz = loader.loadClass(factoryClass);            
+            Class<?> clazz = loader.loadClass(factoryClass);
             acceptorFactories.add((AcceptorFactory)clazz.newInstance());
          }
          catch (Exception e)
@@ -138,7 +138,7 @@
          }
       }
    }
-   
+
    // RemotingService implementation -------------------------------
 
    public synchronized void start() throws Exception
@@ -147,23 +147,23 @@
       {
          return;
       }
-                  
+
       for (AcceptorFactory factory: acceptorFactories)
       {
          Acceptor acceptor = factory.createAcceptor(config, handler, this);
-         
+
          acceptors.add(acceptor);
       }
-      
+
       for (Acceptor a : acceptors)
       {
          a.start();
       }
-      
+
       if (connectionExpirePeriod != -1)
       {
          failedConnectionsTask = new FailedConnectionsTask();
-         
+
          failedConnectionTimer.schedule(failedConnectionsTask, 0, 1000);
       }
 
@@ -176,22 +176,22 @@
       {
          return;
       }
-      
+
       if (failedConnectionsTask != null)
       {
          failedConnectionsTask.cancel();
-         
+
          failedConnectionsTask = null;
       }
-      
+
       for (Acceptor acceptor : acceptors)
       {
          acceptor.stop();
       }
-      
+
       started = false;
    }
-   
+
    public boolean isStarted()
    {
       return started;
@@ -206,12 +206,12 @@
    {
       return acceptors;
    }
-   
-   public RemotingConnection getConnection(final long remotingConnectionID)
+
+   public RemotingConnection getConnection(final Object remotingConnectionID)
    {
       return connections.get(remotingConnectionID);
    }
-   
+
    public synchronized void registerAcceptorFactory(final AcceptorFactory factory)
    {
       acceptorFactories.add(factory);
@@ -221,46 +221,46 @@
    {
       acceptorFactories.remove(factory);
    }
-   
+
    public synchronized Set<RemotingConnection> getConnections()
    {
       return new HashSet<RemotingConnection>(connections.values());
    }
 
    // ConnectionLifeCycleListener implementation -----------------------------------
-   
+
    public void connectionCreated(final Connection connection)
    {
       RemotingConnection rc =
          new RemotingConnectionImpl(connection, dispatcher, null, config.getConnectionParams().getCallTimeout());
-      
-      this.connections.put(connection.getID(), rc);
+
+      connections.put(connection.getID(), rc);
    }
 
-   public void connectionDestroyed(long connectionID)
+   public void connectionDestroyed(Object connectionID)
    {
       handler.removeLastPing(connectionID);
-      
+
       if (connections.remove(connectionID) == null)
       {
          throw new IllegalStateException("Cannot find connection with id " + connectionID);
-      }            
+      }
    }
 
-   public void connectionException(long connectionID, MessagingException me)
+   public void connectionException(Object connectionID, MessagingException me)
    {
       RemotingConnection rc = connections.remove(connectionID);
-      
+
       if (rc == null)
       {
          throw new IllegalStateException("Cannot find connection with id " + connectionID);
       }
-      
+
       rc.fail(me);
    }
-   
+
    // Public --------------------------------------------------------
-   
+
    /*
     * Used in testing
     */
@@ -276,44 +276,46 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-   
+
    private class FailedConnectionsTask extends TimerTask
    {
       private boolean cancelled;
-      
+
+      @Override
       public synchronized void run()
       {
          if (cancelled)
          {
             return;
          }
-         
-         Set<Long> failedIDs = handler.scanForFailedConnections(connectionExpirePeriod);
- 
-         for (long id: failedIDs)
+
+         Set<Object> failedIDs = handler.scanForFailedConnections(connectionExpirePeriod);
+
+         for (Object id: failedIDs)
          {
             RemotingConnection conn = connections.get(id);
-            
+
             if (conn == null)
             {
                throw new IllegalStateException("Cannot find connection with id " + id);
             }
-            
+
             MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
                   "Did not receive ping on connection. It is likely a client has exited or crashed without " +
                   "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
-            
+
             conn.fail(me);
          }
       }
-      
+
+      @Override
       public synchronized boolean cancel()
       {
          cancelled = true;
-         
+
          return super.cancel();
       }
-      
+
    }
 
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl;
 
@@ -26,9 +26,9 @@
 import org.jboss.messaging.core.remoting.ResponseHandler;
 
 /**
- * 
+ *
  * A ResponseHandlerImpl
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -50,7 +50,7 @@
       return id;
    }
 
-   public synchronized void handle(final long remotingConnectionID, final Packet packet)
+   public synchronized void handle(final Object remotingConnectionID, final Packet packet)
    {
       if (failed)
       {
@@ -58,7 +58,7 @@
          return;
       }
 
-      this.response = packet;
+      response = packet;
 
       notify();
    }
@@ -104,7 +104,7 @@
       return response;
    }
 
-   
+
    public void reset()
    {
       response = null;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -95,9 +95,9 @@
       return new IoBufferWrapper(buffer);
    }
 
-   public long getID()
+   public Object getID()
    {
-      return session.getId();
+      return Long.valueOf(session.getId());
    }
 
    public void write(final MessagingBuffer buffer)

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,458 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.messaging.util.DataConstants.*;
+import static org.jboss.netty.buffer.ChannelBuffers.*;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Wraps Netty {@link ChannelBuffer} with {@link MessagingBuffer}.
+ * Because there's neither {@code position()} nor {@code limit()} in a Netty
+ * buffer.  {@link ChannelBuffer#readerIndex()} and {@link ChannelBuffer#writerIndex()}
+ * are used as {@code position} and {@code limit} of the buffer respectively
+ * instead.
+ *
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version $Rev$, $Date$
+ */
+public class ChannelBufferWrapper implements MessagingBuffer
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final ChannelBuffer buffer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ChannelBufferWrapper(final int size)
+   {
+      buffer = dynamicBuffer(size);
+      buffer.writerIndex(buffer.capacity());
+   }
+
+   public ChannelBufferWrapper(final ChannelBuffer buffer)
+   {
+      this.buffer = buffer;
+   }
+
+   // Public --------------------------------------------------------
+
+   // MessagingBuffer implementation ----------------------------------------------
+
+   public byte[] array()
+   {
+      ByteBuffer bb = buffer.toByteBuffer();
+      if (bb.hasArray() && !bb.isReadOnly()) {
+         return bb.array();
+      } else {
+         byte[] ba = new byte[bb.remaining()];
+         bb.get(ba);
+         return ba;
+      }
+   }
+
+   public int position()
+   {
+      return buffer.readerIndex();
+   }
+
+   public void position(final int position)
+   {
+      buffer.readerIndex(position);
+   }
+
+   public int limit()
+   {
+      return buffer.writerIndex();
+   }
+
+   public void limit(final int limit)
+   {
+      buffer.writerIndex(limit);
+   }
+
+   public int capacity()
+   {
+      return buffer.capacity();
+   }
+
+   public void flip()
+   {
+      int oldPosition = position();
+      position(0);
+      limit(oldPosition);
+   }
+
+   public MessagingBuffer slice()
+   {
+      return new ChannelBufferWrapper(buffer.slice());
+   }
+
+   public MessagingBuffer createNewBuffer(int len)
+   {
+      return new ChannelBufferWrapper(len);
+   }
+
+   public int remaining()
+   {
+      return buffer.readableBytes();
+   }
+
+   public void rewind()
+   {
+      position(0);
+      buffer.markReaderIndex();
+   }
+
+   public void putByte(byte byteValue)
+   {
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeByte(byteValue);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putBytes(final byte[] byteArray)
+   {
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeBytes(byteArray);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putBytes(final byte[] bytes, int offset, int length)
+   {
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeBytes(bytes, offset, length);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putInt(final int intValue)
+   {
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeInt(intValue);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putInt(final int pos, final int intValue)
+   {
+      buffer.setInt(pos, intValue);
+   }
+
+   public void putLong(final long longValue)
+   {
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeLong(longValue);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putFloat(final float floatValue)
+   {
+      putInt(Float.floatToIntBits(floatValue));
+   }
+
+   public void putDouble(final double d)
+   {
+      putLong(Double.doubleToLongBits(d));
+   }
+
+   public void putShort(final short s)
+   {
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeShort(s);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putChar(final char chr)
+   {
+      putShort((short) chr);
+   }
+
+   public byte getByte()
+   {
+      return buffer.readByte();
+   }
+
+   public short getUnsignedByte()
+   {
+      return buffer.readUnsignedByte();
+   }
+
+   public void getBytes(final byte[] b)
+   {
+      buffer.readBytes(b);
+   }
+
+   public void getBytes(final byte[] b, final int offset, final int length)
+   {
+      buffer.readBytes(b, offset, length);
+   }
+
+   public int getInt()
+   {
+      return buffer.readInt();
+   }
+
+   public long getLong()
+   {
+      return buffer.readLong();
+   }
+
+   public float getFloat()
+   {
+      return Float.intBitsToFloat(getInt());
+   }
+
+   public short getShort()
+   {
+      return buffer.readShort();
+   }
+
+   public int getUnsignedShort()
+   {
+      return buffer.readUnsignedShort();
+   }
+
+   public double getDouble()
+   {
+      return Double.longBitsToDouble(getLong());
+   }
+
+   public char getChar()
+   {
+      return (char) getShort();
+   }
+
+   public void putBoolean(final boolean b)
+   {
+      if (b)
+      {
+         putByte(TRUE);
+      } else
+      {
+         putByte(FALSE);
+      }
+   }
+
+   public boolean getBoolean()
+   {
+      byte b = getByte();
+      return b == TRUE;
+   }
+
+   public void putString(final String nullableString)
+   {
+      putInt(nullableString.length());
+
+      for (int i = 0; i < nullableString.length(); i++)
+      {
+         putChar(nullableString.charAt(i));
+      }
+   }
+
+   public void putNullableString(final String nullableString)
+   {
+      if (nullableString == null)
+      {
+         putByte(NULL);
+      }
+      else
+      {
+         putByte(NOT_NULL);
+         putString(nullableString);
+      }
+   }
+
+   public String getString()
+   {
+      int len = getInt();
+
+      char[] chars = new char[len];
+
+      for (int i = 0; i < len; i++)
+      {
+         chars[i] = getChar();
+      }
+
+      return new String(chars);
+   }
+
+   public String getNullableString()
+   {
+      byte check = getByte();
+
+      if (check == NULL)
+      {
+         return null;
+      }
+      else
+      {
+         return getString();
+      }
+   }
+
+   public void putUTF(final String str) throws Exception
+   {
+      ChannelBuffer encoded = copiedBuffer(str, "UTF-8");
+      int length = encoded.readableBytes();
+      if (length >= 65536) {
+         throw new IllegalArgumentException(
+               "the specified string is too long (" + length + ")");
+      }
+
+      int limit = buffer.writerIndex();
+      buffer.writerIndex(buffer.readerIndex());
+      try {
+         buffer.writeShort((short) length);
+         buffer.writeBytes(encoded);
+      } finally {
+         buffer.readerIndex(buffer.writerIndex());
+         if (limit < buffer.readerIndex()) {
+            limit = buffer.readerIndex();
+         }
+         buffer.writerIndex(limit);
+      }
+   }
+
+   public void putNullableSimpleString(final SimpleString string)
+   {
+      if (string == null)
+      {
+         putByte(NULL);
+      }
+      else
+      {
+         putByte(NOT_NULL);
+         putSimpleString(string);
+      }
+   }
+
+   public void putSimpleString(final SimpleString string)
+   {
+      byte[] data = string.getData();
+
+      putInt(data.length);
+      putBytes(data);
+   }
+
+   public SimpleString getSimpleString()
+   {
+      int len = getInt();
+
+      byte[] data = new byte[len];
+      getBytes(data);
+
+      return new SimpleString(data);
+   }
+
+   public SimpleString getNullableSimpleString()
+   {
+      int b = getByte();
+      if (b == NULL)
+      {
+         return null;
+      }
+      else
+      {
+         return getSimpleString();
+      }
+   }
+
+   public String getUTF() throws Exception
+   {
+      int length = buffer.readUnsignedShort();
+      ChannelBuffer utf8value = buffer.readSlice(length);
+      return utf8value.toString("UTF-8");
+   }
+
+   public Object getUnderlyingBuffer()
+   {
+      return buffer;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
\ No newline at end of file


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ * @version $Rev$, $Date$
+ */
+public class ChannelPipelineSupport
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   private ChannelPipelineSupport()
+   {
+      // Unused
+   }
+
+   // Public --------------------------------------------------------
+
+   public static void addCodecFilter(final ChannelPipeline pipeline,
+                                     final RemotingHandler handler)
+   {
+      assert pipeline != null;
+      pipeline.addLast("decoder", new MessagingFrameDecoder(handler));
+   }
+
+   public static void addSSLFilter(
+         final ChannelPipeline pipeline, final boolean client,
+         final String keystorePath, final String keystorePassword, final String trustStorePath,
+         final String trustStorePassword) throws Exception
+   {
+      SSLContext context = SSLSupport.getInstance(client, keystorePath, keystorePassword,
+            trustStorePath, trustStorePassword);
+      SSLEngine engine = context.createSSLEngine();
+      if (client)
+      {
+         engine.setUseClientMode(true);
+         engine.setWantClientAuth(true);
+      }
+
+      SslHandler handler = new SslHandler(engine);
+      pipeline.addLast("ssl", handler);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.messaging.util.DataConstants.*;
+
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * A Netty FrameDecoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tlee at redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision$, $Date$
+ */
+public class MessagingFrameDecoder extends FrameDecoder
+{
+   private final RemotingHandler handler;
+
+   public MessagingFrameDecoder(final RemotingHandler handler)
+   {
+      this.handler = handler;
+   }
+
+   // FrameDecoder overrides
+   // -------------------------------------------------------------------------------------
+
+   @Override
+   protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
+   {
+      //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
+      int start = in.readerIndex();
+
+      int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+      if (length == -1)
+      {
+         in.readerIndex(start);
+         return false;
+      }
+
+      in.readerIndex(start + SIZE_INT);
+      return in.readBytes(length);
+   }
+}
+
+
+
+


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.netty.channel.Channels.*;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChildChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * A Netty TCP Acceptor that supports SSL
+ *
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="tlee at redhat.com">Trustin Lee</a>
+ *
+ * @version $Rev$, $Date$
+ */
+public class NettyAcceptor implements Acceptor
+{
+   private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+
+   private ExecutorService bossExecutor;
+   private ExecutorService workerExecutor;
+   private ChannelFactory channelFactory;
+   private Channel serverChannel;
+   private ServerBootstrap bootstrap;
+   private NettyChildChannelHandler childChannelHandler;
+
+   private final Configuration configuration;
+
+   private final RemotingHandler handler;
+
+   private final ConnectionLifeCycleListener listener;
+
+   public NettyAcceptor(final Configuration configuration, final RemotingHandler handler,
+                       final ConnectionLifeCycleListener listener)
+   {
+      this.configuration = configuration;
+
+      this.handler = handler;
+
+      this.listener = listener;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (channelFactory != null)
+      {
+         //Already started
+         return;
+      }
+
+      bossExecutor = Executors.newCachedThreadPool();
+      workerExecutor = Executors.newCachedThreadPool();
+      channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+      bootstrap = new ServerBootstrap(channelFactory);
+      childChannelHandler = new NettyChildChannelHandler();
+      bootstrap.setParentHandler(childChannelHandler);
+
+      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+         public ChannelPipeline getPipeline() throws Exception
+         {
+            ChannelPipeline pipeline = pipeline();
+            if (configuration.isSSLEnabled())
+            {
+               ChannelPipelineSupport.addSSLFilter(pipeline, false, configuration.getKeyStorePath(),
+                       configuration.getKeyStorePassword(),
+                       configuration.getTrustStorePath(),
+                       configuration.getTrustStorePassword());
+            }
+            ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+            pipeline.addLast("handler", new NettyHandler());
+            return pipeline;
+         }
+      });
+
+      // Bind
+      bootstrap.setOption("localAddress", new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+      bootstrap.setOption("child.tcpNoDelay", configuration.getConnectionParams().isTcpNoDelay());
+      int receiveBufferSize = configuration.getConnectionParams().getTcpReceiveBufferSize();
+      if (receiveBufferSize != -1)
+      {
+         bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
+      }
+      int sendBufferSize = configuration.getConnectionParams().getTcpSendBufferSize();
+      if (sendBufferSize != -1)
+      {
+         bootstrap.setOption("child.sendBufferSize", sendBufferSize);
+      }
+      bootstrap.setOption("reuseAddress", true);
+      bootstrap.setOption("child.reuseAddress", true);
+      bootstrap.setOption("child.keepAlive", true);
+
+      serverChannel = bootstrap.bind();
+   }
+
+   public synchronized void stop()
+   {
+      if (channelFactory == null)
+      {
+         return;
+      }
+
+      // remove the listener before disposing the acceptor
+      // so that we're not notified when the sessions are destroyed
+      serverChannel.getPipeline().remove(childChannelHandler);
+      serverChannel.close().awaitUninterruptibly();
+      bossExecutor.shutdown();
+      workerExecutor.shutdown();
+      channelFactory = null;
+   }
+
+   // Inner classes -----------------------------------------------------------------------------
+
+   private final class NettyHandler extends SimpleChannelHandler
+   {
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+      {
+         log.error(
+               "caught exception " + e.getCause() + " for channel " +
+               e.getChannel(), e.getCause());
+         MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
+         me.initCause(e.getCause());
+         listener.connectionException(e.getChannel().getId(), me);
+      }
+
+      @Override
+      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+      {
+         ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+         handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+      }
+   }
+
+   private final class NettyChildChannelHandler extends SimpleChannelHandler
+   {
+
+      @Override
+      public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception
+      {
+         Connection tc = new NettyConnection(e.getChildChannel());
+         listener.connectionCreated(tc);
+         ctx.sendUpstream(e);
+      }
+
+      @Override
+      public void childChannelClosed(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception
+      {
+         listener.connectionDestroyed(e.getChildChannel().getId());
+         ctx.sendUpstream(e);
+      }
+   }
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
+
+/**
+ * A NettyAcceptorFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class NettyAcceptorFactory implements AcceptorFactory
+{
+   public Acceptor createAcceptor(final Configuration configuration,
+                                  final RemotingHandler handler,
+                                  final ConnectionLifeCycleListener listener)
+   {
+      return new NettyAcceptor(configuration, handler, listener);
+   }
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,110 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * buhnaflagilibrn
+ * @version <tt>$Revision$</tt>
+ */
+public class NettyConnection implements Connection
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final Channel channel;
+
+   private boolean closed;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public NettyConnection(final Channel channel)
+   {
+      this.channel = channel;
+   }
+
+   // Public --------------------------------------------------------
+
+   // Connection implementation ----------------------------
+
+   public synchronized void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      channel.close().awaitUninterruptibly();
+
+      SslHandler sslHandler = (SslHandler) channel.getPipeline().get("ssl");
+      if (sslHandler != null)
+      {
+         try
+         {
+            sslHandler.close(channel).awaitUninterruptibly();
+         }
+         catch (Throwable t)
+         {
+            // ignore
+         }
+      }
+
+      closed = true;
+   }
+
+   public MessagingBuffer createBuffer(int size)
+   {
+      return new ChannelBufferWrapper(size);
+   }
+
+   public Object getID()
+   {
+      return channel.getId();
+   }
+
+   public void write(final MessagingBuffer buffer)
+   {
+      channel.write(buffer.getUnderlyingBuffer());
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.netty.channel.Channels.*;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.Connector;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ *
+ * A NettyConnector
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ */
+public class NettyConnector implements Connector
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(NettyConnection.class);
+
+   // Attributes ----------------------------------------------------
+
+   private ExecutorService bossExecutor;
+   private ExecutorService workerExecutor;
+   private ChannelFactory  channelFactory;
+   private ClientBootstrap bootstrap;
+
+   private final RemotingHandler handler;
+
+   private final Location location;
+
+   private final ConnectionLifeCycleListener listener;
+
+   private final ConnectionParams params;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public NettyConnector(final Location location, final ConnectionParams params,
+                        final RemotingHandler handler,
+                        final ConnectionLifeCycleListener listener)
+   {
+      if (location == null)
+      {
+         throw new IllegalArgumentException("Invalid argument null location");
+      }
+
+      if (params == null)
+      {
+         throw new IllegalArgumentException("Invalid argument null connection params");
+      }
+
+      if (handler == null)
+      {
+         throw new IllegalArgumentException("Invalid argument null handler");
+      }
+
+      if (listener == null)
+      {
+         throw new IllegalArgumentException("Invalid argument null listener");
+      }
+
+      this.handler = handler;
+      this.location = location;
+      this.listener = listener;
+      this.params = params;
+   }
+
+   public synchronized void start()
+   {
+      if (channelFactory != null)
+      {
+         return;
+      }
+
+      bossExecutor = Executors.newCachedThreadPool();
+      workerExecutor = Executors.newCachedThreadPool();
+      channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
+      bootstrap = new ClientBootstrap(channelFactory);
+
+      bootstrap.setOption("tcpNoDelay", params.isTcpNoDelay());
+      if (params.getTcpReceiveBufferSize() != -1)
+      {
+         bootstrap.setOption("receiveBufferSize", params.getTcpReceiveBufferSize());
+      }
+      if (params.getTcpSendBufferSize() != -1)
+      {
+         bootstrap.setOption("sendBufferSize", params.getTcpSendBufferSize());
+      }
+      bootstrap.setOption("keepAlive", true);
+      bootstrap.setOption("reuseAddress", true);
+
+      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+         public ChannelPipeline getPipeline() throws Exception
+         {
+            ChannelPipeline pipeline = pipeline();
+            if (params.isSSLEnabled())
+            {
+               try
+               {
+                  ChannelPipelineSupport.addSSLFilter(pipeline, true, params.getKeyStorePath(), params.getKeyStorePassword(), null, null);
+               }
+               catch (Exception e)
+               {
+                  IllegalStateException ise = new IllegalStateException("Unable to create MinaConnection for " + location);
+                  ise.initCause(e);
+                  throw ise;
+               }
+            }
+            ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+            pipeline.addLast("handler", new NettyHandler());
+            return pipeline;
+         }
+      });
+   }
+
+   public synchronized void close()
+   {
+      if (channelFactory == null)
+      {
+         return;
+      }
+
+      bossExecutor.shutdown();
+      workerExecutor.shutdown();
+      bootstrap = null;
+      channelFactory = null;
+   }
+
+   public Connection createConnection()
+   {
+      InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
+      ChannelFuture future = bootstrap.connect(address);
+      future.awaitUninterruptibly();
+
+      if (future.isSuccess())
+      {
+         return new NettyConnection(future.getChannel());
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private final class NettyHandler extends SimpleChannelHandler
+   {
+      @Override
+      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+      {
+         ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+         handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+      }
+
+      @Override
+      public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+      {
+         listener.connectionDestroyed(e.getChannel().getId());
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+      {
+         log.error(
+               "caught exception " + e.getCause() + " for session " +
+               e.getChannel(), e.getCause());
+
+         MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
+         me.initCause(e.getCause());
+         listener.connectionException(e.getChannel().getId(), me);
+      }
+   }
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,22 @@
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connector;
+import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
+
+/**
+ * A NettyConnectorFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class NettyConnectorFactory implements ConnectorFactory
+{
+   public Connector createConnector(final Location location, final ConnectionParams params,
+         final RemotingHandler handler, final ConnectionLifeCycleListener listener)
+   {
+      return new NettyConnector(location, params, handler, listener);
+   }
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,26 +18,26 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.spi;
 
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 
 /**
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
+ *
  * @version <tt>$Revision$</tt>
- * 
+ *
  */
 public interface Connection
 {
    MessagingBuffer createBuffer(int size);
-   
-   long getID();
 
+   Object getID();
+
    void write(MessagingBuffer buffer);
 
-   void close();   
+   void close();
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,13 +18,11 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATECONNECTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -49,13 +47,13 @@
    private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
 
    private final MessagingServer server;
-   
+
    private final RemotingService remotingService;
 
    public MessagingServerPacketHandler(final MessagingServer server, final RemotingService remotingService)
    {
       this.server = server;
-      
+
       this.remotingService = remotingService;
    }
 
@@ -64,15 +62,15 @@
       //0 is reserved for this handler
       return 0;
    }
-   
-   public void handle(final long connectionID, final Packet packet)
+
+   public void handle(final Object connectionID, final Packet packet)
    {
       Packet response = null;
-            
+
       RemotingConnection connection = remotingService.getConnection(connectionID);
 
       byte type = packet.getType();
-      
+
       try
       {
          if (type == PING)
@@ -80,13 +78,13 @@
             response = new PacketImpl(PONG);
          }
          else if (type == CREATECONNECTION)
-         {            
+         {
             CreateConnectionRequest request = (CreateConnectionRequest) packet;
-   
+
             response =
-               server.createConnection(request.getUsername(), request.getPassword(),                             
+               server.createConnection(request.getUsername(), request.getPassword(),
                                        request.getVersion(),
-                                       connection);               
+                                       connection);
          }
          else
          {
@@ -97,23 +95,23 @@
       catch (Throwable t)
       {
          MessagingException me;
-         
-         log.error("Caught unexpected exception", t);         
-         
+
+         log.error("Caught unexpected exception", t);
+
          if (t instanceof MessagingException)
          {
             me = (MessagingException)t;
          }
          else
-         {            
+         {
             me = new MessagingException(MessagingException.INTERNAL_ERROR);
          }
-                  
-         response = new MessagingExceptionMessage(me);    
+
+         response = new MessagingExceptionMessage(me);
       }
-      
+
       response.normalize(packet);
-      
+
       connection.sendOneWay(response);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,16 +18,11 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NO_ID_SET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -54,11 +49,11 @@
 
 /**
  * Concrete implementation of BrowserEndpoint.
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @version <tt>$Revision: 3778 $</tt>
- * 
+ *
  * $Id: ServerBrowserImpl.java 3778 2008-02-24 12:15:29Z timfox $
  */
 public class ServerBrowserImpl
@@ -77,29 +72,29 @@
    private final Filter filter;
    private Iterator<ServerMessage> iterator;
    private final RemotingConnection remotingConnection;
-   
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerBrowserImpl(final ServerSession session,
                             final Queue destination, final String messageFilter,
                             final PacketDispatcher dispatcher,
                             final RemotingConnection remotingConnection) throws MessagingException
-   {     
+   {
       this.session = session;
-      
-      this.id = dispatcher.generateID();
-      
+
+      id = dispatcher.generateID();
+
       this.destination = destination;
 
 		if (messageFilter != null)
-		{	
+		{
 		   filter = new FilterImpl(new SimpleString(messageFilter));
 		}
 		else
 		{
 		   filter = null;
 		}
-		
+
 		this.remotingConnection = remotingConnection;
    }
 
@@ -109,7 +104,7 @@
    {
    	return id;
    }
-   
+
    public void reset() throws Exception
    {
       iterator = createIterator();
@@ -126,7 +121,7 @@
 
       return has;
    }
-   
+
    public ServerMessage nextMessage() throws Exception
    {
       if (iterator == null)
@@ -161,29 +156,33 @@
             messages.add(m);
             i++;
          }
-         else break;
-      }		
-		return (Message[])messages.toArray(new Message[messages.size()]);	
+         else
+         {
+            break;
+         }
+      }
+		return messages.toArray(new Message[messages.size()]);
    }
-   
+
    public void close() throws Exception
    {
       iterator = null;
-      
+
       session.removeBrowser(this);
-      
+
       log.trace(this + " closed");
    }
-           
+
    // Public ---------------------------------------------------------------------------------------
 
+   @Override
    public String toString()
    {
       return "BrowserEndpoint[" + id + "]";
    }
 
    // Package protected ----------------------------------------------------------------------------
-   
+
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
@@ -191,14 +190,14 @@
    private Iterator<ServerMessage> createIterator()
    {
       List<MessageReference> refs = destination.list(filter);
-      
+
       List<ServerMessage> msgs = new ArrayList<ServerMessage>();
-      
+
       for (MessageReference ref: refs)
       {
          msgs.add(ref.getMessage());
       }
-      
+
       return msgs.iterator();
    }
 
@@ -208,15 +207,15 @@
    }
 
    // Inner classes --------------------------------------------------------------------------------
-   
+
    private class ServerBrowserEndpointHandler implements PacketHandler
    {
       public long getID()
       {
-         return ServerBrowserImpl.this.id;
+         return id;
       }
-      
-      public void handle(final long connectionID, final Packet packet)
+
+      public void handle(final Object connectionID, final Packet packet)
       {
          Packet response = null;
 
@@ -226,19 +225,19 @@
             switch (type)
             {
             case SESS_BROWSER_HASNEXTMESSAGE:
-               response = new SessionBrowserHasNextMessageResponseMessage(hasNextMessage());            
+               response = new SessionBrowserHasNextMessageResponseMessage(hasNextMessage());
                break;
             case SESS_BROWSER_NEXTMESSAGE:
-               ServerMessage message = nextMessage();               
+               ServerMessage message = nextMessage();
                response = new ReceiveMessage(message, 0, 0);
                break;
-            case SESS_BROWSER_RESET:            
+            case SESS_BROWSER_RESET:
                reset();
-               response = new PacketImpl(NULL); 
+               response = new PacketImpl(NULL);
                break;
             case CLOSE:
                close();
-               response = new PacketImpl(NULL); 
+               response = new PacketImpl(NULL);
                break;
             default:
                response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
@@ -248,24 +247,24 @@
          catch (Throwable t)
          {
             MessagingException me;
-            
-            log.error("Caught unexpected exception", t);         
-            
+
+            log.error("Caught unexpected exception", t);
+
             if (t instanceof MessagingException)
             {
                me = (MessagingException)t;
             }
             else
-            {            
+            {
                me = new MessagingException(MessagingException.INTERNAL_ERROR);
             }
-                     
-            response = new MessagingExceptionMessage(me);    
+
+            response = new MessagingExceptionMessage(me);
          }
-         
+
          response.normalize(packet);
-         
-         remotingConnection.sendOneWay(response);         
+
+         remotingConnection.sendOneWay(response);
       }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
@@ -43,13 +43,13 @@
    private static final Logger log = Logger.getLogger(ServerConnectionPacketHandler.class);
 
    private final ServerConnection connection;
-   
+
    private final RemotingConnection remotingConnection;
 
    public ServerConnectionPacketHandler(final ServerConnection connection, final RemotingConnection remotingConnection)
    {
       this.connection = connection;
-      
+
       this.remotingConnection = remotingConnection;
    }
 
@@ -58,14 +58,14 @@
       return connection.getID();
    }
 
-   public void handle(final long remotingConnectionID, final Packet packet)
+   public void handle(final Object remotingConnectionID, final Packet packet)
    {
       Packet response = null;
 
       byte type = packet.getType();
 
       try
-      {      
+      {
          switch (type)
          {
             case PacketImpl.CONN_CREATESESSION:
@@ -87,26 +87,26 @@
             default:
                response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
                      "Unsupported packet " + type));
-         }      
+         }
       }
       catch (Throwable t)
       {
          MessagingException me;
-         
-         log.error("Caught unexpected exception", t);         
-         
+
+         log.error("Caught unexpected exception", t);
+
          if (t instanceof MessagingException)
          {
             me = (MessagingException)t;
          }
          else
-         {            
+         {
             me = new MessagingException(MessagingException.INTERNAL_ERROR);
          }
-                  
-         response = new MessagingExceptionMessage(me);    
+
+         response = new MessagingExceptionMessage(me);
       }
-      
+
       if (response instanceof MessagingExceptionMessage)
       {
          MessagingExceptionMessage mee = (MessagingExceptionMessage)response;
@@ -115,8 +115,8 @@
       if (response != null)
       {
          response.normalize(packet);
-         
-         remotingConnection.sendOneWay(response);    
+
+         remotingConnection.sendOneWay(response);
       }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
@@ -33,7 +33,7 @@
 import org.jboss.messaging.core.server.ServerConsumer;
 
 /**
- * 
+ *
  * A ServerConsumerPacketHandler
  *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -45,13 +45,13 @@
    private static final Logger log = Logger.getLogger(ServerConsumerPacketHandler.class);
 
 	private final ServerConsumer consumer;
-	
+
 	private final RemotingConnection remotingConnection;
-	
+
 	public ServerConsumerPacketHandler(final ServerConsumer consumer, final RemotingConnection remotingConnection)
 	{
 		this.consumer = consumer;
-		
+
 		this.remotingConnection = remotingConnection;
 	}
 
@@ -60,12 +60,12 @@
       return consumer.getID();
    }
 
-   public void handle(final long remotingConnectionID, final Packet packet)
+   public void handle(final Object remotingConnectionID, final Packet packet)
    {
       Packet response = null;
 
       byte type = packet.getType();
-      
+
       try
       {
          switch (type)
@@ -76,7 +76,7 @@
             break;
          case PacketImpl.CLOSE:
             consumer.close();
-            response = new PacketImpl(PacketImpl.NULL);     
+            response = new PacketImpl(PacketImpl.NULL);
             break;
          default:
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
@@ -86,26 +86,26 @@
       catch (Throwable t)
       {
          MessagingException me;
-         
-         log.error("Caught unexpected exception", t);         
-         
+
+         log.error("Caught unexpected exception", t);
+
          if (t instanceof MessagingException)
          {
             me = (MessagingException)t;
          }
          else
-         {            
+         {
             me = new MessagingException(MessagingException.INTERNAL_ERROR);
          }
-                  
-         response = new MessagingExceptionMessage(me);    
+
+         response = new MessagingExceptionMessage(me);
       }
 
       if (response != null)
       {
          response.normalize(packet);
-         
-         remotingConnection.sendOneWay(response);    
+
+         remotingConnection.sendOneWay(response);
       }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
@@ -33,24 +33,24 @@
 import org.jboss.messaging.core.server.ServerProducer;
 
 /**
- * 
+ *
  * A ServerProducerPacketHandler
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public class ServerProducerPacketHandler implements PacketHandler
 {
    private static final Logger log = Logger.getLogger(ServerProducerPacketHandler.class);
-  
+
 	private final ServerProducer producer;
-	
+
 	private final RemotingConnection remotingConnection;
-	
+
 	public ServerProducerPacketHandler(final ServerProducer producer, final RemotingConnection remotingConnection)
 	{
 		this.producer = producer;
-		
+
 		this.remotingConnection = remotingConnection;
 	}
 
@@ -59,14 +59,14 @@
       return producer.getID();
    }
 
-   public void handle(final long remotingConnectionID, final Packet packet)
+   public void handle(final Object remotingConnectionID, final Packet packet)
    {
       Packet response = null;
 
       byte type = packet.getType();
-      
+
       try
-      {      
+      {
          switch (type)
          {
          case PacketImpl.PROD_SEND:
@@ -89,26 +89,26 @@
       catch (Throwable t)
       {
          MessagingException me;
-         
-         log.error("Caught unexpected exception", t);         
-         
+
+         log.error("Caught unexpected exception", t);
+
          if (t instanceof MessagingException)
          {
             me = (MessagingException)t;
          }
          else
-         {            
+         {
             me = new MessagingException(MessagingException.INTERNAL_ERROR);
          }
-                  
-         response = new MessagingExceptionMessage(me);    
+
+         response = new MessagingExceptionMessage(me);
       }
 
       if (response != null)
       {
          response.normalize(packet);
-         
-         remotingConnection.sendOneWay(response);    
+
+         remotingConnection.sendOneWay(response);
       }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,12 +18,10 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NO_ID_SET;
-
 import java.util.List;
 
 import javax.transaction.xa.Xid;
@@ -61,9 +59,9 @@
 import org.jboss.messaging.core.server.ServerSession;
 
 /**
- * 
+ *
  * A ServerSessionPacketHandler
- * 
+ *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -71,15 +69,15 @@
 public class ServerSessionPacketHandler implements PacketHandler
 {
 	private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
-	
+
 	private final ServerSession session;
-	
+
 	private final RemotingConnection remotingConnection;
-	
+
 	public ServerSessionPacketHandler(final ServerSession session, final RemotingConnection remotingConnection)
    {
 		this.session = session;
-		
+
 		this.remotingConnection = remotingConnection;
    }
 
@@ -88,20 +86,20 @@
       return session.getID();
    }
 
-   public void handle(final long remotingConnectionID, final Packet packet)
+   public void handle(final Object remotingConnectionID, final Packet packet)
    {
       Packet response = null;
 
       byte type = packet.getType();
-      
+
       try
-      {      
+      {
          switch (type)
          {
          case PacketImpl.SESS_CREATECONSUMER:
          {
             SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
-            
+
             response = session.createConsumer(request.getClientTargetID(), request.getQueueName(), request.getFilterString(),
             		                            request.isNoLocal(), request.isAutoDeleteQueue(),
             		                            request.getWindowSize(), request.getMaxRate());
@@ -268,31 +266,31 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
                   "Unsupported packet " + type));
          }
-      
+
       }
       catch (Throwable t)
       {
          MessagingException me;
-         
-         log.error("Caught unexpected exception", t);         
-         
+
+         log.error("Caught unexpected exception", t);
+
          if (t instanceof MessagingException)
          {
             me = (MessagingException)t;
          }
          else
-         {            
+         {
             me = new MessagingException(MessagingException.INTERNAL_ERROR);
          }
-                  
-         response = new MessagingExceptionMessage(me);    
+
+         response = new MessagingExceptionMessage(me);
       }
 
       if (response != null)
       {
          response.normalize(packet);
-         
-         remotingConnection.sendOneWay(response);    
+
+         remotingConnection.sendOneWay(response);
       }
    }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -35,9 +35,9 @@
 import org.jboss.messaging.tests.util.UnitTestCase;
 
 /**
- * 
+ *
  * A RemotingHandlerImplTest
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -48,6 +48,7 @@
    private PacketDispatcher dispatcher;
    private ExecutorService executorService;
 
+   @Override
    protected void setUp() throws Exception
    {
       super.setUp();
@@ -57,67 +58,68 @@
       buff = new ByteBufferWrapper(ByteBuffer.allocate(1024));
    }
 
+   @Override
    protected void tearDown() throws Exception
    {
       super.tearDown();
       handler = null;
       buff = null;
    }
-   
+
    public void testScanForFailedConnections() throws Exception
    {
       final long expirePeriod = 100;
-      
+
       MessagingBuffer buff1 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
       Packet ping1 = new PacketImpl(PacketImpl.PING);
-      ping1.encode(buff1); 
+      ping1.encode(buff1);
       buff1.getInt();
       final long connectionID1 = 120912;
-      
+
       MessagingBuffer buff2 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
       Packet ping2 = new PacketImpl(PacketImpl.PING);
-      ping2.encode(buff2);      
+      ping2.encode(buff2);
       buff2.getInt();
       final long connectionID2 = 12023;
-      
+
       MessagingBuffer buff3 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
       Packet ping3 = new PacketImpl(PacketImpl.PING);
-      ping3.encode(buff3);      
+      ping3.encode(buff3);
       buff3.getInt();
       final long connectionID3 = 1123128;
-      
+
       MessagingBuffer buff4 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
       Packet ping4 = new PacketImpl(PacketImpl.PING);
       ping4.encode(buff4);
       buff4.getInt();
       final long connectionID4 = 127987;
-      
-      Set<Long> failed = handler.scanForFailedConnections(expirePeriod);
-      
+
+      Set<Object> failed = handler.scanForFailedConnections(expirePeriod);
+
       assertEquals(0, failed.size());
-      
-      handler.bufferReceived(connectionID1, buff1);      
+
+      handler.bufferReceived(connectionID1, buff1);
       handler.bufferReceived(connectionID2, buff2);
       handler.bufferReceived(connectionID3, buff3);
       handler.bufferReceived(connectionID4, buff4);
-      
-      failed = handler.scanForFailedConnections(expirePeriod);      
+
+      failed = handler.scanForFailedConnections(expirePeriod);
       assertEquals(0, failed.size());
-      
+
       Thread.sleep(expirePeriod + 10);
-      
-      failed = handler.scanForFailedConnections(expirePeriod);      
+
+      failed = handler.scanForFailedConnections(expirePeriod);
       assertEquals(4, failed.size());
-      
-      handler.removeLastPing(connectionID1);      
-      handler.removeLastPing(connectionID2);      
-      handler.removeLastPing(connectionID3);      
+
+      handler.removeLastPing(connectionID1);
+      handler.removeLastPing(connectionID2);
+      handler.removeLastPing(connectionID3);
       handler.removeLastPing(connectionID4);
-      
-      failed = handler.scanForFailedConnections(expirePeriod);      
-      assertEquals(0, failed.size());                 
+
+      failed = handler.scanForFailedConnections(expirePeriod);
+      assertEquals(0, failed.size());
    }
-   
 
+
 }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -35,9 +35,9 @@
 import org.jboss.messaging.tests.util.UnitTestCase;
 
 /**
- * 
+ *
  * A RemotingServiceImplTest
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -49,51 +49,51 @@
        final long interval = 100;
        config.getConnectionParams().setPingInterval(interval);
        RemotingServiceImpl remotingService = new RemotingServiceImpl(config);
-       
+
        RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
        remotingService.setHandler(handler);
-       
-       Set<Long> failed = new HashSet<Long>();
-       
+
+       Set<Object> failed = new HashSet<Object>();
+
        EasyMock.expect(handler.scanForFailedConnections((long)(1.5 * interval))).andReturn(failed);
-              
+
        EasyMock.replay(handler);
-       
+
        remotingService.start();
-       
+
        Thread.sleep(interval * 2);
-              
+
        EasyMock.verify(handler);
-                     
+
     }
-    
+
     public void testScanForFailedConnectionsFailed() throws Exception
     {
        ConfigurationImpl config = new ConfigurationImpl();
        final long interval = 100;
        config.getConnectionParams().setPingInterval(interval);
        RemotingServiceImpl remotingService = new RemotingServiceImpl(config);
-       
+
        RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
        remotingService.setHandler(handler);
-       
-       Set<Long> failed = new HashSet<Long>();
+
+       Set<Object> failed = new HashSet<Object>();
        failed.add(2L);
        failed.add(3L);
-       
+
        EasyMock.expect(handler.scanForFailedConnections((long)(1.5 * interval))).andReturn(failed);
-       
+
        Connection conn1 = EasyMock.createStrictMock(Connection.class);
        Connection conn2 = EasyMock.createStrictMock(Connection.class);
        Connection conn3 = EasyMock.createStrictMock(Connection.class);
-                                   
+
        EasyMock.expect(conn1.getID()).andStubReturn(1);
        EasyMock.expect(conn2.getID()).andStubReturn(2);
        EasyMock.expect(conn3.getID()).andStubReturn(3);
-       
+
        conn2.close();
-       conn3.close();       
-       
+       conn3.close();
+
        class Listener implements FailureListener
        {
           volatile MessagingException me;
@@ -102,39 +102,39 @@
              this.me = me;
           }
        }
-                           
+
        EasyMock.replay(handler, conn1, conn2, conn3);
-       
+
        remotingService.start();
-       
+
        remotingService.connectionCreated(conn1);
        remotingService.connectionCreated(conn2);
        remotingService.connectionCreated(conn3);
-       
+
        RemotingConnection rc1 = remotingService.getConnection(1);
        RemotingConnection rc2 = remotingService.getConnection(2);
        RemotingConnection rc3 = remotingService.getConnection(3);
-       
+
        Listener listener1 = new Listener();
        rc1.addFailureListener(listener1);
-       
+
        Listener listener2 = new Listener();
        rc2.addFailureListener(listener2);
-       
+
        Listener listener3 = new Listener();
        rc3.addFailureListener(listener3);
-       
+
        Thread.sleep(interval * 2);
-              
+
        EasyMock.verify(handler, conn1, conn2, conn3);
-       
+
        assertNull(listener1.me);
        assertNotNull(listener2.me);
        assertNotNull(listener3.me);
-       
+
        assertEquals(MessagingException.CONNECTION_TIMEDOUT, listener2.me.getCode());
        assertEquals(MessagingException.CONNECTION_TIMEDOUT, listener3.me.getCode());
-                     
+
     }
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java	2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,18 +18,12 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.tests.unit.core.remoting;
 
-import static org.jboss.messaging.tests.util.RandomUtil.randomByte;
-import static org.jboss.messaging.tests.util.RandomUtil.randomBytes;
-import static org.jboss.messaging.tests.util.RandomUtil.randomDouble;
-import static org.jboss.messaging.tests.util.RandomUtil.randomFloat;
-import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-import static org.jboss.messaging.tests.util.UnitTestCase.assertEqualsByteArrays;
+import static org.jboss.messaging.tests.util.RandomUtil.*;
+import static org.jboss.messaging.tests.util.UnitTestCase.*;
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.remoting.MessagingBuffer;
@@ -38,7 +32,7 @@
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
+ *
  * @version <tt>$Revision$</tt>
  */
 public abstract class MessagingBufferTestBase extends TestCase
@@ -73,7 +67,7 @@
    {
       assertNull(putAndGetNullableString(null));
    }
-   
+
    public void testEmptyString() throws Exception
    {
       String result = putAndGetNullableString("");
@@ -85,7 +79,7 @@
    public void testNonEmptyString() throws Exception
    {
       String junk = randomString();
-      
+
       String result = putAndGetNullableString(junk);
 
       assertNotNull(result);
@@ -96,7 +90,7 @@
    {
       assertNull(putAndGetNullableSimpleString(null));
    }
-   
+
    public void testEmptySimpleString() throws Exception
    {
       SimpleString emptySimpleString = new SimpleString("");
@@ -119,53 +113,53 @@
    {
       byte b = randomByte();
       wrapper.putByte(b);
-      
+
       wrapper.flip();
-      
+
       assertEquals(b, wrapper.getByte());
    }
-   
+
    public void testUnsignedByte() throws Exception
    {
       byte b = (byte) 0xff;
       wrapper.putByte(b);
-      
+
       wrapper.flip();
-      
+
       assertEquals(255, wrapper.getUnsignedByte());
 
       wrapper.rewind();
-      
+
       b = (byte) 0xf;
       wrapper.putByte(b);
-      
+
       wrapper.flip();
-      
+
       assertEquals(b, wrapper.getUnsignedByte());
    }
-   
-   
-   
+
+
+
    public void testBytes() throws Exception
    {
       byte[] bytes = randomBytes();
       wrapper.putBytes(bytes);
-      
+
       wrapper.flip();
-      
+
       byte[] b = new byte[bytes.length];
       wrapper.getBytes(b);
       assertEqualsByteArrays(bytes, b);
    }
-   
+
    public void testBytesWithLength() throws Exception
    {
       byte[] bytes = randomBytes();
       // put only half of the bytes
       wrapper.putBytes(bytes, 0, bytes.length / 2);
-      
+
       wrapper.flip();
-      
+
       byte[] b = new byte[bytes.length / 2];
       wrapper.getBytes(b, 0, b.length);
       assertEqualsByteArrays(b.length, bytes, b);
@@ -174,151 +168,151 @@
    public void testPutTrueBoolean() throws Exception
    {
       wrapper.putBoolean(true);
-      
+
       wrapper.flip();
-      
+
       assertTrue(wrapper.getBoolean());
    }
 
    public void testPutFalseBoolean() throws Exception
    {
       wrapper.putBoolean(false);
-      
+
       wrapper.flip();
-      
+
       assertFalse(wrapper.getBoolean());
    }
-      
+
    public void testChar() throws Exception
    {
       wrapper.putChar('a');
-      
+
       wrapper.flip();
-      
+
       assertEquals('a', wrapper.getChar());
    }
-   
+
    public void testInt() throws Exception
    {
       int i = randomInt();
       wrapper.putInt(i);
-      
+
       wrapper.flip();
-      
+
       assertEquals(i, wrapper.getInt());
    }
-   
+
    public void testIntAtPosition() throws Exception
    {
       int firstInt = randomInt();
       int secondInt = randomInt();
-      
+
       wrapper.putInt(secondInt);
       wrapper.putInt(secondInt);
       // rewrite firstInt at the beginning
       wrapper.putInt(0, firstInt);
 
       wrapper.flip();
-      
+
       assertEquals(firstInt, wrapper.getInt());
       assertEquals(secondInt, wrapper.getInt());
    }
-   
+
    public void testLong() throws Exception
    {
       long l = randomLong();
       wrapper.putLong(l);
-      
+
       wrapper.flip();
-      
+
       assertEquals(l, wrapper.getLong());
    }
-   
+
    public void testUnsignedShort() throws Exception
    {
       short s1 = Short.MAX_VALUE;
-      
+
       wrapper.putShort(s1);
-      
+
       wrapper.flip();
-      
+
       int s2 = wrapper.getUnsignedShort();
-      
-      assertEquals((int) s1, s2);
-      
+
+      assertEquals(s1, s2);
+
       wrapper.rewind();
-      
+
       s1 = Short.MIN_VALUE;
-      
+
       wrapper.putShort(s1);
-      
+
       wrapper.flip();
-      
+
       s2 = wrapper.getUnsignedShort();
-      
-      assertEquals(((int) s1) * -1, s2);
-      
+
+      assertEquals(s1 * -1, s2);
+
       wrapper.rewind();
-      
+
       s1 = -1;
-      
+
       wrapper.putShort(s1);
-      
+
       wrapper.flip();
-      
+
       s2 = wrapper.getUnsignedShort();
-      
+
       // / The max of an unsigned short
       // (http://en.wikipedia.org/wiki/Unsigned_short)
       assertEquals(s2, 65535);
    }
-   
+
    public void testShort() throws Exception
    {
       wrapper.putShort((short) 1);
-      
+
       wrapper.flip();
-      
+
       assertEquals((short)1, wrapper.getShort());
    }
-      
+
    public void testDouble() throws Exception
    {
       double d = randomDouble();
       wrapper.putDouble(d);
-      
+
       wrapper.flip();
-      
+
       assertEquals(d, wrapper.getDouble());
    }
-   
+
    public void testFloat() throws Exception
    {
       float f = randomFloat();
       wrapper.putFloat(f);
-      
+
       wrapper.flip();
-      
+
       assertEquals(f, wrapper.getFloat());
    }
-   
+
    public void testUTF() throws Exception
    {
       String str = randomString();
       wrapper.putUTF(str);
-      
+
       wrapper.flip();
-      
+
       assertEquals(str, wrapper.getUTF());
    }
-   
+
    public void testArray() throws Exception
    {
       byte[] bytes = randomBytes(128);
       wrapper.putBytes(bytes);
-      
+
       wrapper.flip();
-      
+
       byte[] array = wrapper.array();
       assertEquals(wrapper.capacity(), array.length);
       assertEqualsByteArrays(128, bytes, wrapper.array());
@@ -328,16 +322,16 @@
    {
       int i = randomInt();
       wrapper.putInt(i);
-      
+
       wrapper.flip();
-      
+
       assertEquals(i, wrapper.getInt());
-      
+
       wrapper.rewind();
 
       assertEquals(i, wrapper.getInt());
    }
-   
+
    public void testRemaining() throws Exception
    {
       int capacity = wrapper.capacity();
@@ -347,22 +341,22 @@
       int fill = capacity / 3;
       byte[] bytes = randomBytes(fill);
       wrapper.putBytes(bytes);
-      
+
       // check the remaining is 2/3
       assertEquals(capacity - fill, wrapper.remaining());
    }
-   
+
    public void testPosition() throws Exception
    {
       assertEquals(0, wrapper.position());
-      
+
       byte[] bytes = randomBytes(128);
       wrapper.putBytes(bytes);
-      
+
       assertEquals(bytes.length, wrapper.position());
-      
+
       wrapper.position(0);
-      assertEquals(0, wrapper.position());      
+      assertEquals(0, wrapper.position());
    }
 
    public void testLimit() throws Exception
@@ -372,12 +366,12 @@
       byte[] bytes = randomBytes(128);
       wrapper.putBytes(bytes);
 
-      assertEquals(wrapper.capacity(), wrapper.limit()); 
-      
+      assertTrue(wrapper.limit() >= bytes.length);
+
       wrapper.limit(128);
       assertEquals(128, wrapper.limit());
    }
-   
+
    public void testSlice() throws Exception
    {
       byte[] bytes = randomBytes(128);
@@ -385,16 +379,16 @@
 
       wrapper.position(0);
       wrapper.limit(128);
-      
+
       MessagingBuffer slicedBuffer = wrapper.slice();
       assertEquals(128, slicedBuffer.capacity());
-      
+
       byte[] slicedBytes = new byte[128];
       slicedBuffer.getBytes(slicedBytes);
-      
+
       assertEqualsByteArrays(bytes, slicedBytes);
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -406,18 +400,18 @@
       wrapper.putNullableString(nullableString);
 
       wrapper.flip();
-      
+
       return wrapper.getNullableString();
    }
-   
+
    private SimpleString putAndGetNullableSimpleString(SimpleString nullableSimpleString) throws Exception
    {
       wrapper.putNullableSimpleString(nullableSimpleString);
 
       wrapper.flip();
-      
+
       return wrapper.getNullableSimpleString();
    }
-   
+
    // Inner classes -------------------------------------------------
 }

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.netty.ChannelBufferWrapper;
+import org.jboss.messaging.tests.unit.core.remoting.MessagingBufferTestBase;
+
+/**
+ * Same as ChannelBufferWrapperTest, but using a different constructor
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ChannelBufferWrapper2Test extends MessagingBufferTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // BufferWrapperBase overrides -----------------------------------
+
+   @Override
+   protected MessagingBuffer createBuffer()
+   {
+      return new ChannelBufferWrapper(512);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.netty.ChannelBufferWrapper;
+import org.jboss.messaging.tests.unit.core.remoting.MessagingBufferTestBase;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ChannelBufferWrapperTest extends MessagingBufferTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // BufferWrapperBase overrides -----------------------------------
+
+   @Override
+   protected MessagingBuffer createBuffer()
+   {
+      ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(512);
+      buffer.writerIndex(buffer.capacity());
+      return new ChannelBufferWrapper(buffer);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptor;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A MinaAcceptorFactoryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyAcceptorFactoryTest extends UnitTestCase
+{
+   public void testCreateAcceptor() throws Exception
+   {
+      NettyAcceptorFactory factory = new NettyAcceptorFactory();
+
+      RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+      Configuration config = new ConfigurationImpl();
+      ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+
+      Acceptor acceptor = factory.createAcceptor(config, handler, listener);
+
+      assertTrue(acceptor instanceof NettyAcceptor);
+   }
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptor;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A MinaAcceptorTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyAcceptorTest extends UnitTestCase
+{
+   public void testStartStop() throws Exception
+   {
+      RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+      Configuration config = new ConfigurationImpl();
+      ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+      NettyAcceptor acceptor = new NettyAcceptor(config, handler, listener);
+
+      acceptor.start();
+      acceptor.stop();
+      acceptor.start();
+      acceptor.stop();
+
+   }
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import java.util.UUID;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.netty.NettyConnection;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.netty.channel.Channel;
+
+/**
+ *
+ * A NettyConnectionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyConnectionTest extends UnitTestCase
+{
+   public void testGetID() throws Exception
+   {
+      Channel channel = EasyMock.createStrictMock(Channel.class);
+
+      final UUID id = UUID.randomUUID();
+
+      EasyMock.expect(channel.getId()).andReturn(id);
+
+      NettyConnection conn = new NettyConnection(channel);
+
+      EasyMock.replay(channel);
+
+      assertEquals(id, conn.getID());
+
+      EasyMock.verify(channel);
+   }
+
+   public void testWrite() throws Exception
+   {
+      Channel channel = EasyMock.createStrictMock(Channel.class);
+
+      final Object underlying = new Object();
+
+      MessagingBuffer buff = EasyMock.createStrictMock(MessagingBuffer.class);
+
+      EasyMock.expect(buff.getUnderlyingBuffer()).andReturn(underlying);
+
+      EasyMock.expect(channel.write(underlying)).andReturn(null);
+
+      NettyConnection conn = new NettyConnection(channel);
+
+      EasyMock.replay(channel, buff);
+
+      conn.write(buff);
+
+      EasyMock.verify(channel, buff);
+   }
+
+   public void testCreateBuffer() throws Exception
+   {
+      Channel channel = EasyMock.createStrictMock(Channel.class);
+
+      NettyConnection conn = new NettyConnection(channel);
+
+      EasyMock.replay(channel);
+
+      final int size = 1234;
+
+      MessagingBuffer buff = conn.createBuffer(size);
+      buff.putByte((byte) 0x00); // Netty buffer does lazy initialization.
+      assertEquals(size, buff.capacity());
+
+      EasyMock.verify(channel);
+   }
+
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * 
+ * A MinaConnectorTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyConnectorTest extends UnitTestCase
+{   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+     
+   public void testStartStop() throws Exception
+   {
+      RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+      ConnectionParams params = new ConnectionParamsImpl();
+      Location location = new LocationImpl(TransportType.TCP, "blah", 1234);
+      ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+
+      MinaConnector connector = new MinaConnector(location, params, handler, listener);
+      
+      connector.start();
+      connector.close();
+   }
+   
+   public void testNullParams() throws Exception
+   {
+      RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+      ConnectionParams params = new ConnectionParamsImpl();
+      Location location = new LocationImpl(TransportType.TCP, "blah", 1234);
+      ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+
+      try
+      {
+         new MinaConnector(null, params, handler, listener);
+         
+         fail("Should throw Exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         //Ok
+      }
+      
+      try
+      {
+         new MinaConnector(location, null, handler, listener);
+         
+         fail("Should throw Exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         //Ok
+      }
+      
+      try
+      {
+         new MinaConnector(location, params, null, listener);
+         
+         fail("Should throw Exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         //Ok
+      }
+      
+      try
+      {
+         new MinaConnector(location, params, handler, null);
+         
+         fail("Should throw Exception");
+      }
+      catch (IllegalArgumentException e)
+      {
+         //Ok
+      }
+   }
+}


Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native




More information about the jboss-cvs-commits mailing list