[jboss-cvs] JBoss Messaging SVN: r3636 - in trunk: src/main/org/jboss/jms/client/impl and 16 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 28 09:35:42 EST 2008


Author: jmesnil
Date: 2008-01-28 09:35:41 -0500 (Mon, 28 Jan 2008)
New Revision: 3636

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java
   trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java
Modified:
   trunk/.classpath
   trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
Log:
* implemented JBMESSAGING-1196 - add heartbeat + tests
* removed unused version byte from AbstractPacket
* added oneWay boolean to AbstractPacket to know when a response is expected or not

Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/.classpath	2008-01-28 14:35:41 UTC (rev 3636)
@@ -17,6 +17,8 @@
 	<classpathentry kind="src" path="docs/examples/topic/src"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
+	<classpathentry kind="src" path="tests/etc/ide"/>
+	<classpathentry excluding="ide/" kind="src" path="tests/etc"/>
 	<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
@@ -71,9 +73,8 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/jbosssx-client/lib/jbosssx-client.jar"/>
 	<classpathentry kind="lib" path="lib/je-3.2.44.jar"/>
 	<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
-	<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar" sourcepath="/home/tim/workspace/mina-trunk/core/src/main/java"/>
+	<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-sources.jar"/>
 	<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
-	<classpathentry kind="lib" path="tests/etc"/>
 	<classpathentry kind="lib" path="src/etc/server/default/config"/>
 	<classpathentry kind="lib" path="src/etc/server/default/deploy"/>
 	<classpathentry kind="lib" path="tests/lib/jdbc-drivers/mysql-connector-java-5.1.5-bin.jar"/>

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -122,7 +122,7 @@
       MessagingRemotingConnection remotingConnection = null;
       try
       {
-         remotingConnection = new MessagingRemotingConnection(version, serverLocatorURI);
+         remotingConnection = new MessagingRemotingConnection(serverLocatorURI);
        
          remotingConnection.start();
          

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessagingRemotingConnection.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -70,14 +70,10 @@
    // explicitly remove it from the remoting client
    private ConsolidatedRemotingConnectionListener remotingConnectionListener;
    
-   private Version version;
-     
    // Constructors ---------------------------------------------------------------------------------
 
-   public MessagingRemotingConnection(Version version, String serverLocatorURI) throws Exception
+   public MessagingRemotingConnection(String serverLocatorURI) throws Exception
    {
-      this.version = version;
-      
       serverLocator = new ServerLocator(serverLocatorURI);
       
       log.trace(this + " created");
@@ -130,9 +126,6 @@
    public void sendOneWay(String id, AbstractPacket packet) throws JMSException
    {
       packet.setTargetID(id);
-      
-      packet.setVersion(version.getProviderIncrementingVersion());
-      
       client.sendOneWay(packet);      
    }
    
@@ -140,8 +133,6 @@
    {
       packet.setTargetID(id);
       
-      packet.setVersion(version.getProviderIncrementingVersion());
-      
       try
       {
          AbstractPacket response = (AbstractPacket) client.sendBlocking(packet);
@@ -166,6 +157,9 @@
    public synchronized void addConnectionListener(ConsolidatedRemotingConnectionListener listener)
    {
       this.remotingConnectionListener = listener;
+      if (client != null)
+         client.addConnectionListener(remotingConnectionListener);
+      
    }
 
    public synchronized ConsolidatedRemotingConnectionListener getConnectionListener()

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -35,6 +35,7 @@
 
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Logger;
@@ -47,7 +48,7 @@
  *
  * $Id$
  */
-public class SimpleConnectionManager implements ConnectionManager
+public class SimpleConnectionManager implements ConnectionManager, ConnectionExceptionListener
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -197,6 +198,13 @@
       //NOOP
    }
 
+   // ConnectionExceptionListener ------------------------------------------------------------------
+   
+   public void handleConnectionException(Exception e, String clientSessionID)
+   {
+      handleClientFailure(clientSessionID , true);
+   }
+   
    // Public ---------------------------------------------------------------------------------------
 
    /*
@@ -268,7 +276,7 @@
          {
             try
             {
-      			log.debug("clPearing up state for connection " + sce);
+      			log.debug("clearing up state for connection " + sce);
                sce.closing();
                sce.close();
                log.debug("cleared up state for connection " + sce);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -45,7 +45,6 @@
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
 import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
-import org.jboss.messaging.core.remoting.wireformat.ClosingMessage;
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -306,19 +305,12 @@
             } else if (type == MSG_BROWSER_RESET)
             {
                reset();
-
-               response = new NullPacket();
             } else if (type == PacketType.MSG_CLOSING)
             {
-               ClosingMessage request = (ClosingMessage) packet;
                closing();
-
-               response = new NullPacket();
             } else if (type == MSG_CLOSE)
             {
                close();
-
-               response = new NullPacket();
             } else
             {
                response = new JMSExceptionMessage(new MessagingJMSException(
@@ -326,12 +318,16 @@
             }
 
             // reply if necessary
+            if (response == null && packet.isOneWay() == false)
+            {
+               response = new NullPacket();               
+            }
+            
             if (response != null)
             {
                response.normalize(packet);
                sender.send(response);
             }
-
          } catch (JMSException e)
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -583,18 +583,12 @@
             } else if (type == MSG_STOPCONNECTION)
             {
                stop();
-
-               response = new NullPacket();
             } else if (type == PacketType.MSG_CLOSING)
             {              
                closing();
-
-               response = new NullPacket();
             } else if (type == MSG_CLOSE)
             {
                close();
-
-               response = new NullPacket();
             } 
             else if (type == REQ_GETCLIENTID)
             {
@@ -603,8 +597,6 @@
             {
                SetClientIDMessage message = (SetClientIDMessage) packet;
                setClientID(message.getClientID());
-
-               response = new NullPacket();
             } else
             {
                response = new JMSExceptionMessage(new MessagingJMSException(
@@ -612,12 +604,16 @@
             }
 
             // reply if necessary
+            if (response == null && packet.isOneWay() == false)
+            {
+               response = new NullPacket();               
+            }
+            
             if (response != null)
             {
                response.normalize(packet);
                sender.send(response);
             }
-
          } catch (JMSException e)
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -44,7 +44,6 @@
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
-import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -569,14 +568,10 @@
             } else if (type == PacketType.MSG_CLOSING)
             {
                closing();
-               
-               response = new NullPacket();
             } else if (type == MSG_CLOSE)
             {
                close();
                setReplier(null);
-               
-               response = new NullPacket();
             } else
             {
                response = new JMSExceptionMessage(new MessagingJMSException(
@@ -584,6 +579,11 @@
             }
 
             // reply if necessary
+            if (response == null && packet.isOneWay() == false)
+            {
+               response = new NullPacket();               
+            }
+            
             if (response != null)
             {
                response.normalize(packet);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -1540,37 +1540,26 @@
             else if (type == PacketType.MSG_CLOSING)
             {
                closing();
-
-               response = new NullPacket();
             } else if (type == MSG_CLOSE)
             {
                close();
-
-               response = new NullPacket();
             } else if (type == MSG_UNSUBSCRIBE)
             {
                UnsubscribeMessage message = (UnsubscribeMessage) packet;
                unsubscribe(message.getSubscriptionName());
-
-               response = new NullPacket();
             } else if (type == MSG_ADDTEMPORARYDESTINATION)
             {
                AddTemporaryDestinationMessage message = (AddTemporaryDestinationMessage) packet;
                addTemporaryDestination(message.getDestination());
-
-               response = new NullPacket();
             } else if (type == MSG_DELETETEMPORARYDESTINATION)
             {
                DeleteTemporaryDestinationMessage message = (DeleteTemporaryDestinationMessage) packet;
                deleteTemporaryDestination(message.getDestination());
-
-               response = new NullPacket();
             }            
             else if (type == PacketType.MSG_ACKNOWLEDGE)
             {
                SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
                acknowledge(message.getDeliveryID(), message.isAllUpTo());
-               response = new NullPacket();
             }
             else if (type == PacketType.MSG_COMMIT)
             {
@@ -1592,12 +1581,16 @@
             }
 
             // reply if necessary
+            if (response == null && packet.isOneWay() == false)
+            {
+               response = new NullPacket();               
+            }
+            
             if (response != null)
             {
                response.normalize(packet);
                sender.send(response);
             }
-
          } catch (JMSException e)
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);

Modified: trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -72,8 +72,6 @@
                                                   reference.getDeliveryCount() + 1);
       
       message.setTargetID(consumerID);
-                  
-      message.setVersion((byte)0);
       
       sender.send(message);
    }

Added: trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectionExceptionListener.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,19 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface ConnectionExceptionListener
+{
+   void handleConnectionException(Exception e, String sessionID);
+
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface KeepAliveFactory
+{
+
+   Ping ping();
+   
+   Pong pong();
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -13,6 +13,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
 import org.jboss.messaging.util.Logger;
 
 /**
@@ -33,6 +34,7 @@
    // Static --------------------------------------------------------
 
    public static final PacketDispatcher client = new PacketDispatcher();
+   public static final Map<String, String> sessions = new ConcurrentHashMap<String, String>();
 
    // Constructors --------------------------------------------------
 
@@ -77,6 +79,18 @@
    
    public void dispatch(AbstractPacket packet, PacketSender sender)
    {
+      //FIXME better separation between client and server PacketDispatchers
+      if (this != client)
+      {
+         if (packet instanceof SetSessionIDMessage)
+         {
+            String clientSessionID = ((SetSessionIDMessage)packet).getSessionID();
+            if (log.isDebugEnabled())
+               log.debug("associated server session " + sender.getSessionID() + " to client " + clientSessionID);
+            sessions.put(sender.getSessionID(), clientSessionID);
+            return;
+         }
+      }
       String targetID = packet.getTargetID();
       if (NO_ID_SET.equals(targetID))
       {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -17,4 +17,6 @@
 public interface PacketSender
 {
    void send(AbstractPacket packet);
+
+   String getSessionID();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -10,7 +10,6 @@
 import static org.jboss.messaging.core.remoting.codec.DecoderStatus.NOT_OK;
 import static org.jboss.messaging.core.remoting.codec.DecoderStatus.OK;
 import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
-import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_VERSION_SET;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -38,6 +37,8 @@
 
    public static final byte FALSE = (byte) 1;
 
+   public static final int BOOLEAN_LENGTH = 1;
+
    public static final int INT_LENGTH = 4;
 
    public static final int FLOAT_LENGTH = 4;
@@ -101,12 +102,6 @@
       assert packet != null;
       assert buf != null;
 
-      byte version = packet.getVersion();
-      if (version == NO_VERSION_SET)
-      {
-         throw new IllegalStateException("packet must be versioned: " + packet);
-      }
-
       long correlationID = packet.getCorrelationID();
       // to optimize the size of the packets, if the targetID
       // or the callbackID are not set, they are encoded as null
@@ -121,14 +116,14 @@
       {
          callbackID = null;
       }
-      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID);
+      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + BOOLEAN_LENGTH;
 
       buf.put(packet.getType().byteValue());
-      buf.put(version);
       buf.putInt(headerLength);
       buf.putLong(correlationID);
       buf.putNullableString(targetID);
       buf.putNullableString(callbackID);
+      buf.putBoolean(packet.isOneWay());
 
       encodeBody(packet, buf);
    }
@@ -149,9 +144,9 @@
 
    public DecoderStatus decodable(RemotingBuffer buffer)
    {
-      if (buffer.remaining() < 2)
+      if (buffer.remaining() < 1)
       {
-         // can not read packet type & version
+         // can not read packet type
          return NEED_DATA;
       }
       byte t = buffer.get();
@@ -159,7 +154,6 @@
       {
          return NOT_OK;
       }
-      buffer.get(); // version
       if (buffer.remaining() < INT_LENGTH)
       {
          if (log.isDebugEnabled())
@@ -189,7 +183,7 @@
       {
          return NOT_OK;
       }
-
+      buffer.getBoolean(); // oneWay boolean
       if (buffer.remaining() < INT_LENGTH)
       {
          if (log.isDebugEnabled())
@@ -214,19 +208,18 @@
    public P decode(RemotingBuffer wrapper) throws Exception
    {
       wrapper.get(); // skip message type
-      byte version = wrapper.get();
       wrapper.getInt(); // skip header length
       long correlationID = wrapper.getLong();
       String targetID = wrapper.getNullableString();
       String callbackID = wrapper.getNullableString();
-
+      boolean oneWay = wrapper.getBoolean();
+      
       P packet = decodeBody(wrapper);
 
       if (packet == null)
       {
          return null;
       }
-      packet.setVersion(version);
       if (targetID == null)
          targetID = NO_ID_SET;
       packet.setTargetID(targetID);
@@ -234,6 +227,7 @@
       if (callbackID == null)
          callbackID = NO_ID_SET;
       packet.setCallbackID(callbackID);
+      packet.setOneWay(oneWay);
 
       return packet;
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -38,6 +38,10 @@
          throws Exception
    {
       byte[] bytes = packet.getBytes();
+      
+      int bodyLength = INT_LENGTH + bytes.length;
+      
+      out.putInt(bodyLength);
       out.putInt(bytes.length);
       out.put(bytes);
    }
@@ -51,8 +55,8 @@
       {
          return null;
       }
-      
-      byte[] bytes = new byte[bodyLength];
+      int byteLength = in.getInt();
+      byte[] bytes = new byte[byteLength];
       in.get(bytes);
 
       return new BytesPacket(bytes);

Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SetSessionIDMessageCodec.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETSESSIONID;
+
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class SetSessionIDMessageCodec extends
+      AbstractPacketCodec<SetSessionIDMessage>
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SetSessionIDMessageCodec()
+   {
+      super(MSG_SETSESSIONID);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(SetSessionIDMessage message, RemotingBuffer out)
+         throws Exception
+   {
+      String sessionID = message.getSessionID();
+
+      out.putInt(sizeof(sessionID));
+      out.putNullableString(sessionID);
+   }
+
+   @Override
+   protected SetSessionIDMessage decodeBody(RemotingBuffer in) throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (bodyLength > in.remaining())
+      {
+         return null;
+      }
+      String sessionID = in.getNullableString();
+
+      return new SetSessionIDMessage(sessionID);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientImpl.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -99,26 +99,22 @@
       return session.getID();
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.remoting.Client#sendOneWay(org.jboss.messaging.core.remoting.wireformat.AbstractPacket)
-    */
    public void sendOneWay(AbstractPacket packet) throws JMSException
    {
       assert packet != null;
       checkConnected();
-
+      packet.setOneWay(true);
+      
       session.write(packet);
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.remoting.Client#sendBlocking(org.jboss.messaging.core.remoting.wireformat.AbstractPacket)
-    */
    public AbstractPacket sendBlocking(AbstractPacket packet)
          throws IOException, JMSException
    {
       assert packet != null;
       checkConnected();
 
+      packet.setOneWay(false);
       try
       {
          AbstractPacket response = (AbstractPacket) session.writeAndBlock(packet, 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -70,11 +70,15 @@
       serverDispatcher.dispatch((AbstractPacket) object,
             new PacketSender()
             {
-
                public void send(AbstractPacket response)
                {
                   PacketDispatcher.client.dispatch(response, null);
                }
+               
+               public String getSessionID()
+               {
+                  return getID();
+               }
             });
    }
 
@@ -91,9 +95,17 @@
                {
                   responses[0] = response;
                }
+
+               public String getSessionID()
+               {
+                  return getID();
+               }
             });
 
-      assert responses[0] != null;
+      if (responses[0] == null)
+      {
+         throw new IllegalStateException("No response received for request " + request);
+      }
 
       return responses[0];
    }

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ClientKeepAliveFactory.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ClientKeepAliveFactory implements KeepAliveFactory
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // KeepAliveFactory implementation -------------------------------
+   
+   public Ping ping()
+   {
+      return new Ping();
+   }
+
+   public Pong pong()
+   {
+      return new Pong();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,6 +6,7 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
+import static org.apache.mina.filter.keepalive.KeepAlivePolicy.EXCEPTION;
 import static org.apache.mina.filter.logging.LogLevel.TRACE;
 import static org.apache.mina.filter.logging.LogLevel.WARN;
 
@@ -15,9 +16,11 @@
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.keepalive.KeepAliveFilter;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filter.logging.MdcInjectionFilter;
 import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -37,6 +40,24 @@
 
    // Public --------------------------------------------------------
 
+   public static void addKeepAliveFilter(DefaultIoFilterChainBuilder filterChain,
+         KeepAliveFactory factory, int keepAliveInterval, int keepAliveTimeout)
+   {
+      assert filterChain != null;
+      assert factory != null;
+      
+      if (keepAliveTimeout > keepAliveInterval)
+      {
+         throw new IllegalArgumentException("timeout must be greater than the interval: "
+               + "keepAliveTimeout= " + keepAliveTimeout
+               + ", keepAliveInterval=" + keepAliveInterval);
+      }
+
+      filterChain.addLast("keep-alive", new KeepAliveFilter(
+            new MinaKeepAliveFactory(factory), EXCEPTION, keepAliveInterval,
+            keepAliveTimeout));
+   }
+
    // Package protected ---------------------------------------------
 
    static void addCodecFilter(DefaultIoFilterChainBuilder filterChain)
@@ -73,23 +94,24 @@
 
       filterChain.addLast("logger", filter);
    }
-   
+
    static void addExecutorFilter(DefaultIoFilterChainBuilder filterChain)
    {
       ExecutorFilter executorFilter = new ExecutorFilter();
       filterChain.addLast("executor", executorFilter);
    }
-   
+
    static ScheduledExecutorService addBlockingRequestResponseFilter(
          DefaultIoFilterChainBuilder filterChain)
    {
       assert filterChain != null;
 
-      ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
+      ScheduledExecutorService executorService = Executors
+            .newScheduledThreadPool(1);
       RequestResponseFilter filter = new RequestResponseFilter(
-            new MinaInspector(), executorService);      
+            new MinaInspector(), executorService);
       filterChain.addLast("reqres", filter);
-      
+
       return executorService;
    }
 

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/KeepAliveNotifier.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,21 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface KeepAliveNotifier
+{
+   public abstract void notifyKeepAliveTimeout(TimeoutException e, String remoteSessionID);
+}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -10,14 +10,19 @@
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addBlockingRequestResponseFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.ConnectFuture;
@@ -28,19 +33,24 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
 import org.jboss.messaging.core.remoting.NIOConnector;
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.ServerLocator;
 import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
 import org.jboss.messaging.util.Logger;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ * 
  * @version <tt>$Revision$</tt>
- *
+ * 
  */
-public class MinaConnector implements NIOConnector
+public class MinaConnector implements NIOConnector, KeepAliveNotifier
 {
    // Constants -----------------------------------------------------
 
@@ -51,14 +61,16 @@
    private String host;
 
    private int port;
-   
+
    private NioSocketConnector connector;
 
    private ScheduledExecutorService blockingScheduler;
 
    private IoSession session;
 
+   // FIXME clean up this listener mess
    private Map<ConsolidatedRemotingConnectionListener, IoServiceListener> listeners = new HashMap<ConsolidatedRemotingConnectionListener, IoServiceListener>();
+   private ConnectionExceptionListener listener;
 
    // Static --------------------------------------------------------
 
@@ -66,33 +78,68 @@
 
    // Public --------------------------------------------------------
 
+   public MinaConnector(ServerLocator locator)
+   {
+      this(locator.getTransport(), locator.getHost(), locator.getPort(),
+            locator.getParameters(), new ClientKeepAliveFactory());
+   }
+
+   public MinaConnector(ServerLocator locator, KeepAliveFactory keepAliveFactory)
+   {
+      this(locator.getTransport(), locator.getHost(), locator.getPort(),
+            locator.getParameters(), keepAliveFactory);
+   }
+
    public MinaConnector(TransportType transport, String host, int port)
    {
+      this(transport, host, port, new HashMap<String, String>(),
+            new ClientKeepAliveFactory());
+   }
+
+   private MinaConnector(TransportType transport, String host, int port,
+         Map<String, String> parameters, KeepAliveFactory keepAliveFactory)
+   {
       assert transport == TCP;
       assert host != null;
       assert port > 0;
-      
+      assert parameters != null;
+      assert keepAliveFactory != null;
+
       this.host = host;
       this.port = port;
-      
+
       this.connector = new NioSocketConnector();
       DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
-      
+
+      // FIXME no hard-coded values
+      int keepAliveInterval = parameters.containsKey(KEEP_ALIVE_INTERVAL_KEY) ? Integer
+            .parseInt(parameters.get(KEEP_ALIVE_INTERVAL_KEY))
+            : 10;
+      int keepAliveTimeout = parameters.containsKey(KEEP_ALIVE_TIMEOUT_KEY) ? Integer
+            .parseInt(parameters.get(KEEP_ALIVE_TIMEOUT_KEY))
+            : 5;
+
       addMDCFilter(filterChain);
       addCodecFilter(filterChain);
       addLoggingFilter(filterChain);
       blockingScheduler = addBlockingRequestResponseFilter(filterChain);
+      addKeepAliveFilter(filterChain, keepAliveFactory, keepAliveInterval,
+            keepAliveTimeout);
       addExecutorFilter(filterChain);
 
-      connector.setHandler(new MinaHandler(PacketDispatcher.client));
+      connector.setHandler(new MinaHandler(PacketDispatcher.client, this));
       connector.getSessionConfig().setKeepAlive(true);
       connector.getSessionConfig().setReuseAddress(true);
    }
 
    // NIOConnector implementation -----------------------------------
-   
-   public NIOSession connect() throws IOException 
+
+   public NIOSession connect() throws IOException
    {
+      if (session != null)
+      {
+         return new MinaSession(session);
+      }
       InetSocketAddress address = new InetSocketAddress(host, port);
       ConnectFuture future = connector.connect(address);
       connector.setDefaultRemoteAddress(address);
@@ -103,9 +150,13 @@
          throw new IOException("Cannot connect to " + address.toString());
       }
       this.session = future.getSession();
+      AbstractPacket packet = new SetSessionIDMessage(Long.toString(session
+            .getId()));
+      session.write(packet);
+
       return new MinaSession(session);
    }
-   
+
    public boolean disconnect()
    {
       if (session == null)
@@ -118,14 +169,14 @@
 
       connector.dispose();
       blockingScheduler.shutdown();
-      
+
       connector = null;
       blockingScheduler = null;
       session = null;
 
       return closed;
    }
-   
+
    public void addConnectionListener(
          final ConsolidatedRemotingConnectionListener listener)
    {
@@ -139,8 +190,9 @@
       if (log.isTraceEnabled())
          log.trace("added listener " + listener + " to " + this);
    }
-   
-   public void removeConnectionListener(ConsolidatedRemotingConnectionListener listener)
+
+   public void removeConnectionListener(
+         ConsolidatedRemotingConnectionListener listener)
    {
       assert listener != null;
       assert connector != null;
@@ -151,7 +203,7 @@
       if (log.isTraceEnabled())
          log.trace("removed listener " + listener + " from " + this);
    }
-   
+
    public String getServerURI()
    {
       if (connector == null)
@@ -162,11 +214,34 @@
       if (address != null)
       {
          return TCP + "://" + address.toString();
-      } else {
+      } else
+      {
          return TCP + "://" + host + ":" + port;
       }
    }
 
+   public void setConnectionExceptionListener(ConnectionExceptionListener listener)
+   {
+      assert listener != null;
+      
+      this.listener = listener;
+   }
+   
+   // KeepAliveManager implementation -------------------------------
+   
+   public void notifyKeepAliveTimeout(TimeoutException cause, String remoteSessionID)
+   {
+      if (listener != null)
+         listener.handleConnectionException(cause, remoteSessionID);
+      
+      Iterator<ConsolidatedRemotingConnectionListener> set = listeners.keySet().iterator();
+      while (set.hasNext())
+      {
+         ConsolidatedRemotingConnectionListener listener = set.next();
+         listener.handleConnectionException(cause);
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,12 +6,16 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
+import java.util.concurrent.TimeoutException;
+
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.keepalive.KeepAliveTimeoutException;
 import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
 import org.jboss.messaging.util.Logger;
 
 /**
@@ -30,13 +34,16 @@
 
    private final PacketDispatcher dispatcher;
 
+   private KeepAliveNotifier keepAliveManager;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
    
-   public MinaHandler(PacketDispatcher dispatcher)
+   public MinaHandler(PacketDispatcher dispatcher, KeepAliveNotifier keepAliveManager)
    {
       this.dispatcher = dispatcher;
+      this.keepAliveManager = keepAliveManager;
    }
 
    // Public --------------------------------------------------------
@@ -47,6 +54,13 @@
    public void exceptionCaught(IoSession session, Throwable cause)
          throws Exception
    {
+      if (cause instanceof KeepAliveTimeoutException && keepAliveManager != null)
+      {
+         String serverSessionID = Long.toString(session.getId());
+         TimeoutException e = new TimeoutException();
+         e.initCause(cause);
+         keepAliveManager.notifyKeepAliveTimeout(e, serverSessionID);
+      }
       // FIXME ugly way to know we're on the server side
       // close session only on the server side
       if (dispatcher != PacketDispatcher.client)
@@ -66,6 +80,14 @@
          // do nothing
          return;
       }
+      
+      if (message instanceof Ping)
+      {
+         log.trace("received ping " + message);
+         // response is handled by the keep-alive filter.
+         // do nothing
+         return;
+      }
 
       if (!(message instanceof AbstractPacket))
       {
@@ -79,6 +101,11 @@
          {
             session.write(p);
          }
+         
+         public String getSessionID()
+         {
+            return Long.toString(session.getId());
+         }
       };
 
       if (log.isTraceEnabled())

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaKeepAliveFactory.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaKeepAliveFactory implements KeepAliveMessageFactory
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private KeepAliveFactory innerFactory;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public MinaKeepAliveFactory(KeepAliveFactory factory)
+   {
+      assert factory != null;
+      
+      this.innerFactory = factory;
+   }
+
+   // Public --------------------------------------------------------
+
+   // KeepAliveMessageFactory implementation ------------------------
+   
+   public Object getRequest(IoSession session)
+   {
+      return innerFactory.ping();
+   }
+
+   public Object getResponse(IoSession session, Object request)
+   {
+      return innerFactory.pong();
+   }
+
+   public boolean isRequest(IoSession session, Object request)
+   {
+      return (request instanceof Ping);
+   }
+
+   public boolean isResponse(IoSession session, Object response)
+   {
+      return (response instanceof Pong);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -9,15 +9,19 @@
 import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
 
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.ServerLocator;
 import org.jboss.messaging.core.remoting.TransportType;
@@ -25,16 +29,20 @@
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ * 
  * @version <tt>$Revision$</tt>
- *
+ * 
  */
-public class MinaService
+public class MinaService implements KeepAliveNotifier
 {
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MinaService.class);
 
+   public static final String KEEP_ALIVE_INTERVAL_KEY = "keepAliveInterval";
+   public static final String KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeout";
+   public static final String DISABLE_INVM_KEY = "disable-invm";
+
    // Attributes ----------------------------------------------------
 
    private TransportType transport;
@@ -42,68 +50,107 @@
    private final String host;
 
    private final int port;
-   
+
    private Map<String, String> parameters;
-   
+
    private NioSocketAcceptor acceptor;
 
-   private int blockingRequestTimeout = 5;
-
    private PacketDispatcher dispatcher;
 
+   private ConnectionExceptionListener listener;
+
+   private KeepAliveFactory factory;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    public MinaService(String transport, String host, int port)
    {
-      this(TransportType.valueOf(transport.toUpperCase()), host, port);
+      this(TransportType.valueOf(transport.toUpperCase()), host, port, new ServerKeepAliveFactory());
    }
-   
-   public MinaService(TransportType transport, String host, int port)
+
+   public MinaService(TransportType transport, String host, int port, KeepAliveFactory factory)
    {
       assert transport != null;
-      assert host !=  null;
+      assert host != null;
       assert port > 0;
-      
+      assert factory != null;
+
       this.transport = transport;
       this.host = host;
       this.port = port;
       this.parameters = new HashMap<String, String>();
+      this.factory = factory;
       this.dispatcher = new PacketDispatcher();
    }
 
+   public void setConnectionExceptionListener(ConnectionExceptionListener listener)
+   {
+      assert listener != null;
+
+      this.listener = listener;
+   }
+   
+   // KeepAliveManager implementation -------------------------------
+
+   public void notifyKeepAliveTimeout(TimeoutException e, String remoteSessionID)
+   {
+      if (listener != null)
+      {
+         String clientSessionID = PacketDispatcher.sessions.get(remoteSessionID);
+         listener.handleConnectionException(e, clientSessionID);
+      }
+   }
+
    // Public --------------------------------------------------------
-   
+
    public void setParameters(Map<String, String> parameters)
    {
       assert parameters != null;
-      
+
       this.parameters = parameters;
    }
 
+   public void setKeepAliveFactory(KeepAliveFactory factory)
+   {
+      assert factory != null;
+      
+      this.factory = factory;
+   }
+
    public ServerLocator getLocator()
    {
       return new ServerLocator(transport, host, port, parameters);
    }
-   
+
    public PacketDispatcher getDispatcher()
    {
       return dispatcher;
    }
-   
+
    public void start() throws Exception
    {
       if (acceptor == null)
       {
          acceptor = new NioSocketAcceptor();
          DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-         
+
          addMDCFilter(filterChain);
          addCodecFilter(filterChain);
          addLoggingFilter(filterChain);
+         if (parameters.containsKey(KEEP_ALIVE_INTERVAL_KEY)
+               && parameters.containsKey(KEEP_ALIVE_TIMEOUT_KEY))
+         {
+            int keepAliveInterval = Integer.parseInt(parameters
+                  .get(KEEP_ALIVE_INTERVAL_KEY));
+            int keepAliveTimeout = Integer.parseInt(parameters
+                  .get(KEEP_ALIVE_TIMEOUT_KEY));
+            addKeepAliveFilter(filterChain, factory,
+                  keepAliveInterval, keepAliveTimeout);
+         }
          addExecutorFilter(filterChain);
-         
+
          // Bind
          acceptor.setLocalAddress(new InetSocketAddress(host, port));
          acceptor.setReuseAddress(true);
@@ -111,11 +158,17 @@
          acceptor.getSessionConfig().setKeepAlive(true);
          acceptor.setDisconnectOnUnbind(false);
 
-         acceptor.setHandler(new MinaHandler(dispatcher));
+         acceptor.setHandler(new MinaHandler(dispatcher, this));
          acceptor.bind();
-         
-         REGISTRY.register(getLocator(), dispatcher);
-      } 
+
+         boolean disableInvm = false;
+         if (parameters.containsKey(DISABLE_INVM_KEY))
+               disableInvm = Boolean.valueOf(parameters.get(DISABLE_INVM_KEY)).booleanValue();
+         if (log.isDebugEnabled())
+            log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
+         if (!disableInvm)
+            REGISTRY.register(getLocator(), dispatcher);
+      }
    }
 
    public void stop()
@@ -124,12 +177,13 @@
       {
          acceptor.unbind();
          acceptor.dispose();
+         System.err.println(acceptor.isDisposed());
          acceptor = null;
-         
+
          REGISTRY.unregister(getLocator());
-      }    
+      }
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -7,6 +7,8 @@
 package org.jboss.messaging.core.remoting.impl.mina;
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
 
 import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
 import org.jboss.messaging.core.remoting.codec.AbstractPacketCodec;
@@ -36,6 +38,7 @@
 import org.jboss.messaging.core.remoting.codec.SessionCancelMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SetClientIDMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SetSessionIDMessageCodec;
 import org.jboss.messaging.core.remoting.codec.TextPacketCodec;
 import org.jboss.messaging.core.remoting.codec.UnsubscribeMessageCodec;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -68,6 +71,8 @@
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
 import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
@@ -75,6 +80,7 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
 import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -107,6 +113,10 @@
       addCodec(TextPacket.class, TextPacketCodec.class);
       addCodec(BytesPacket.class, BytesPacketCodec.class);
 
+      addCodecForEmptyPacket(PING, Ping.class);
+      addCodecForEmptyPacket(PONG, Pong.class);
+      addCodec(SetSessionIDMessage.class, SetSessionIDMessageCodec.class);
+
       addCodec(CreateConnectionRequest.class,
             ConnectionFactoryCreateConnectionRequestCodec.class);
 

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ServerKeepAliveFactory implements KeepAliveFactory
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // KeepAliveFactory implementation -------------------------------
+   
+   public Ping ping()
+   {
+      return new Ping();
+   }
+
+   public Pong pong()
+   {
+      return new Pong();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -8,6 +8,8 @@
 
 import static org.jboss.messaging.core.remoting.Assert.assertValidID;
 
+import org.jboss.messaging.core.remoting.Client;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
  * 
@@ -21,12 +23,8 @@
 
    public static final long NO_CORRELATION_ID = -1L;
 
-   public static final byte NO_VERSION_SET = (byte)-1;
-
    // Attributes ----------------------------------------------------
 
-   private byte version = NO_VERSION_SET;
-
    private long correlationID = NO_CORRELATION_ID;
 
    private String targetID = NO_ID_SET;
@@ -35,6 +33,14 @@
 
    private final PacketType type;
 
+   /**
+    * <code>oneWay</code> is <code>true</code> when the packet is sent "one way"
+    * by the client which does not expect any response to it.
+    * 
+    * @see Client#sendOneWay(AbstractPacket)
+    */
+   private boolean oneWay = false;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -53,18 +59,6 @@
       return type;
    }
 
-   public void setVersion(byte version)
-   {
-      assert version != NO_VERSION_SET;
-
-      this.version = version;
-   }
-
-   public byte getVersion()
-   {
-      return version;
-   }
-
    public void setCorrelationID(long correlationID)
    {
       this.correlationID = correlationID;
@@ -99,11 +93,20 @@
       return callbackID;
    }
 
+   public void setOneWay(boolean oneWay)
+   {
+      this.oneWay = oneWay;
+   }
+
+   public boolean isOneWay()
+   {
+      return oneWay;
+   }
+   
    public void normalize(AbstractPacket other)
    {
       assert other != null;
 
-      setVersion(other.getVersion());
       setCorrelationID(other.getCorrelationID());
       setTargetID(other.getCallbackID());
    }
@@ -126,9 +129,9 @@
 
    protected String getParentString()
    {
-      return "PACKET[type=" + type + ", version=" + version
+      return "PACKET[type=" + type
             + ", correlationID=" + correlationID + ", targetID=" + targetID
-            + ", callbackID=" + callbackID;
+            + ", callbackID=" + callbackID + ", oneWay=" + oneWay;
    }
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -17,6 +17,9 @@
    MSG_JMSEXCEPTION               ((byte) 2),
    TEXT                           ((byte) 3),
    BYTES                          ((byte) 4),
+   PING                           ((byte) 5),
+   PONG                           ((byte) 6),
+   MSG_SETSESSIONID               ((byte) 7),
 
    // Connection factory
    REQ_CREATECONNECTION           ((byte)10),

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class Ping extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public Ping()
+   {
+      super(PING);
+   }
+   
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class Pong extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public Pong()
+   {
+      super(PONG);
+   }
+   
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,7 +6,9 @@
  */
 package org.jboss.messaging.core.remoting.wireformat;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCEL;
 
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
@@ -28,7 +30,7 @@
 
    public SessionCancelMessage(long deliveryID, boolean expired)
    {
-      super(PacketType.MSG_CANCEL);
+      super(MSG_CANCEL);
       
       this.deliveryID = deliveryID;
       
@@ -47,6 +49,12 @@
       return expired;
    }
 
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", deliveryID=" + deliveryID + ", expired=" + expired + "]";
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRecoverMessage.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,7 +6,9 @@
  */
 package org.jboss.messaging.core.remoting.wireformat;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVER;
 
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
@@ -24,7 +26,7 @@
 
    public SessionRecoverMessage()
    {
-      super(PacketType.MSG_RECOVER);
+      super(MSG_RECOVER);
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRollbackMessage.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,7 +6,9 @@
  */
 package org.jboss.messaging.core.remoting.wireformat;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ROLLBACK;
 
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
@@ -24,7 +26,7 @@
 
    public SessionRollbackMessage()
    {
-      super(PacketType.MSG_ROLLBACK);
+      super(MSG_ROLLBACK);
    }
 
    // Public --------------------------------------------------------

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SetSessionIDMessage.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETSESSIONID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SetSessionIDMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String sessionID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SetSessionIDMessage(String sessionID)
+   {
+      super(MSG_SETSESSIONID);
+
+      assertValidID(sessionID);
+      this.sessionID = sessionID;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getSessionID()
+   {
+      return sessionID;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", sessionID=" + sessionID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
\ No newline at end of file

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/build.xml	2008-01-28 14:35:41 UTC (rev 3636)
@@ -512,7 +512,7 @@
                <exclude name="**/jms/MemLeakTest.class"/>
                <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
                <exclude name="**/jms/stress/**"/>
-               <exclude name="**/jms/crash/**"/>
+               <exclude name="**/jms/crash/ClientCrashTest.class"/>
                <exclude name="**/jms/bridge/**"/>
                <exclude name="**/jms/manual/**"/>
                <exclude name="**/jms/clustering/**"/>

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -41,6 +41,8 @@
 
    protected PacketDispatcher serverDispatcher;
 
+   private NIOConnector connector;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -49,7 +51,8 @@
 
    public void testConnected() throws Exception
    {
-      Client client = new ClientImpl(createNIOConnector(), createServerLocator());
+      NIOConnector connector = createNIOConnector();
+      Client client = new ClientImpl(connector, createServerLocator());
       
       assertFalse(client.isConnected());
 
@@ -59,6 +62,8 @@
       assertTrue(client.disconnect());
       assertFalse(client.isConnected());
       assertFalse(client.disconnect());
+      
+      connector.disconnect();
    }    
       
    public void testSendOneWay() throws Exception
@@ -66,7 +71,6 @@
       serverPacketHandler.expectMessage(1);
 
       TextPacket packet = new TextPacket("testSendOneWay");
-      packet.setVersion((byte) 1);
       packet.setTargetID(serverPacketHandler.getID());
       client.sendOneWay(packet);
 
@@ -86,7 +90,6 @@
       for (int i = 0; i < MANY_MESSAGES; i++)
       {
          packets[i] = new TextPacket("testSendManyOneWay " + i);
-         packets[i].setVersion((byte) 1);
          packets[i].setTargetID(serverPacketHandler.getID());
          client.sendOneWay(packets[i]);
       }
@@ -110,7 +113,6 @@
       PacketDispatcher.client.register(callbackHandler);
 
       TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
-      packet.setVersion((byte) 1);
       packet.setTargetID(serverPacketHandler.getID());
       packet.setCallbackID(callbackHandler.getID());
 
@@ -126,7 +128,6 @@
    public void testSendBlocking() throws Exception
    {
       TextPacket request = new TextPacket("testSendBlocking");
-      request.setVersion((byte) 1);
       request.setTargetID(serverPacketHandler.getID());
 
       AbstractPacket receivedPacket = client.sendBlocking(request);
@@ -140,7 +141,6 @@
    public void testCorrelationCounter() throws Exception
    {
       TextPacket request = new TextPacket("testSendBlocking");
-      request.setVersion((byte) 1);
       request.setTargetID(serverPacketHandler.getID());
 
       AbstractPacket receivedPacket = client.sendBlocking(request);
@@ -164,7 +164,6 @@
 
       TextPacket packet = new TextPacket(
             "testClientHandlePacketSentByServer from client");
-      packet.setVersion((byte) 1);
       packet.setTargetID(serverPacketHandler.getID());
       // send a packet to create a sender when the server
       // handles the packet
@@ -176,7 +175,6 @@
       PacketSender sender = serverPacketHandler.getLastSender();
       TextPacket packetFromServer = new TextPacket(
             "testClientHandlePacketSentByServer from server");
-      packetFromServer.setVersion((byte) 1);
       packetFromServer.setTargetID(clientHandler.getID());
       sender.send(packetFromServer);
 
@@ -196,7 +194,7 @@
       serverDispatcher = startServer();
       
       ServerLocator serverLocator = createServerLocator();
-      NIOConnector connector = createNIOConnector();
+      connector = createNIOConnector();
       client = new ClientImpl(connector, serverLocator);
       client.connect();
       
@@ -209,6 +207,7 @@
    {
       serverDispatcher.unregister(serverPacketHandler.getID());
 
+      connector.disconnect();
       client.disconnect();
       stopServer();
       

Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ClientKeepAliveTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,263 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_INTERVAL;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class ClientKeepAliveTest extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MinaService service;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      service = new MinaService(TCP.toString(), "localhost", PORT);
+      Map<String, String> parameters = new HashMap<String, String>();
+      parameters.put(KEEP_ALIVE_INTERVAL_KEY, Integer.toString(KEEP_ALIVE_INTERVAL));
+      parameters.put(KEEP_ALIVE_TIMEOUT_KEY, Integer.toString(KEEP_ALIVE_TIMEOUT));
+      service.setParameters(parameters);
+      service.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service.stop();
+      service = null;
+   }
+
+   public void testKeepAliveWithClientOK() throws Exception
+   {
+      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+      // client never send ping
+      expect(factory.ping()).andStubReturn(null);
+      // client is responding
+      expect(factory.pong()).andReturn(new Pong()).atLeastOnce();
+
+      replay(factory);
+
+      MinaConnector connector = new MinaConnector(service.getLocator(), factory);
+      connector.connect();
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      service.setConnectionExceptionListener(new ConnectionExceptionListener()
+      {
+         public void handleConnectionException(Exception e, String sessionID)
+         {
+            latch.countDown();
+         }
+      });
+
+      boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+            + KEEP_ALIVE_TIMEOUT + 1, SECONDS);
+      assertFalse(firedKeepAliveNotification);
+
+      connector.disconnect();
+
+      verify(factory);
+   }
+   
+   public void testKeepAliveWithClientNotResponding() throws Exception
+   {
+      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+      // client never send ping
+      expect(factory.ping()).andStubReturn(null);
+      // no pong -> client is not responding
+      expect(factory.pong()).andReturn(null).atLeastOnce();
+
+      replay(factory);
+
+      MinaConnector connector = new MinaConnector(service.getLocator(), factory);
+
+      NIOSession session = connector.connect();
+      String clientSessionID = session.getID();
+
+      final String[] clientSessionIDNotResponding = new String[1];
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      service.setConnectionExceptionListener(new ConnectionExceptionListener()
+      {
+         public void handleConnectionException(Exception e, String sessionID)
+         {
+            clientSessionIDNotResponding[0] = sessionID;
+            latch.countDown();
+         }
+      });
+
+      boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+            + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+      assertTrue("notification has not been received", firedKeepAliveNotification);
+      assertNotNull(clientSessionIDNotResponding[0]);
+      assertEquals(clientSessionID, clientSessionIDNotResponding[0]);
+
+      connector.disconnect();
+
+      verify(factory);
+   }
+
+   public void testKeepAliveWithClientTooLongToRespond() throws Exception
+   {
+      KeepAliveFactory factory = new KeepAliveFactory()
+      {
+         public Ping ping()
+         {
+            return null;
+         }
+
+         public synchronized Pong pong()
+         {
+            // like a TCP timeout, there is no response in the next 2 hours
+            try
+            {
+               wait(2 * 3600);
+            } catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            return new Pong();
+         }
+      };
+
+      try
+      {
+         MinaConnector connector = new MinaConnector(service.getLocator(),
+               factory);
+
+         NIOSession session = connector.connect();
+         String clientSessionID = session.getID();
+
+         final String[] clientSessionIDNotResponding = new String[1];
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         service.setConnectionExceptionListener(new ConnectionExceptionListener()
+         {
+            public void handleConnectionException(Exception e, String sessionID)
+            {
+               clientSessionIDNotResponding[0] = sessionID;
+               latch.countDown();
+            }
+         });
+
+         boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+               + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+         assertTrue("notification has not been received", firedKeepAliveNotification);
+         assertNotNull(clientSessionIDNotResponding[0]);
+         assertEquals(clientSessionID, clientSessionIDNotResponding[0]);
+
+         connector.disconnect();
+
+      } finally
+      {
+         // test is done: wake up the factory
+         synchronized (factory)
+         {
+            factory.notify();
+         }
+      }
+   }
+
+   public void testKeepAliveWithClientRespondingAndClientNotResponding()
+         throws Exception
+   {
+      KeepAliveFactory notRespondingfactory = createMock(KeepAliveFactory.class);
+      expect(notRespondingfactory.ping()).andStubReturn(null);
+      expect(notRespondingfactory.pong()).andReturn(null).atLeastOnce();
+
+      KeepAliveFactory respondingfactory = createMock(KeepAliveFactory.class);
+      expect(respondingfactory.ping()).andStubReturn(null);
+      expect(respondingfactory.pong()).andReturn(new Pong()).atLeastOnce();
+
+      replay(notRespondingfactory, respondingfactory);
+
+      MinaConnector connectorNotResponding = new MinaConnector(service
+            .getLocator(), notRespondingfactory);
+      MinaConnector connectorResponding = new MinaConnector(service
+            .getLocator(), respondingfactory);
+
+      NIOSession sessionNotResponding = connectorNotResponding.connect();
+      String clientSessionIDNotResponding = sessionNotResponding.getID();
+
+      NIOSession sessionResponding = connectorResponding.connect();
+      String clientSessionIDResponding = sessionResponding.getID();
+
+      final String[] sessionIDNotResponding = new String[1];
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      service.setConnectionExceptionListener(new ConnectionExceptionListener()
+      {
+         public void handleConnectionException(Exception e, String sessionID)
+         {
+            sessionIDNotResponding[0] = sessionID;
+            latch.countDown();
+         }
+      });
+
+      boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+            + KEEP_ALIVE_TIMEOUT + 2, SECONDS);
+      assertTrue("notification has not been received", firedKeepAliveNotification);
+
+      assertNotNull(sessionIDNotResponding[0]);
+      assertEquals(clientSessionIDNotResponding, sessionIDNotResponding[0]);
+      assertNotSame(clientSessionIDResponding, sessionIDNotResponding[0]);
+
+      connectorNotResponding.disconnect();
+      connectorResponding.disconnect();
+
+      verify(notRespondingfactory, respondingfactory);
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
\ No newline at end of file

Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/FilterChainSupportTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static org.easymock.EasyMock.createMock;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class FilterChainSupportTest extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testAddKeepAliveFilterWithIncorrectParameters() throws Exception
+   {
+      int keepAliveInterval = 5; // seconds
+      int keepAliveTimeout = 10; // seconds
+
+      DefaultIoFilterChainBuilder filterChain = new DefaultIoFilterChainBuilder();
+      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+      try
+      {
+         FilterChainSupport.addKeepAliveFilter(filterChain, factory,
+               keepAliveInterval, keepAliveTimeout);
+         fail("the interval must be greater than the timeout");
+      } catch (IllegalArgumentException e)
+      {
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -50,8 +50,6 @@
       AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
       packet.setTargetID(serverPacketHandler.getID());
       
-      packet.setVersion((byte) 1);
-
       try
       {
          client.sendBlocking(packet);
@@ -78,7 +76,7 @@
    @Override
    protected PacketDispatcher startServer() throws Exception
    {
-      service = new MinaService(TCP, "localhost", PORT);
+      service = new MinaService(TCP.toString(), "localhost", PORT);
       service.start();
       return service.getDispatcher();
    }

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -73,7 +73,7 @@
    @Override
    protected void setUp() throws Exception
    {
-      handler = new MinaHandler(PacketDispatcher.client);
+      handler = new MinaHandler(PacketDispatcher.client, null);
 
       packetHandler = new TestPacketHandler();
       PacketDispatcher.client.register(packetHandler);

Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ServerKeepAliveTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_INTERVAL;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class ServerKeepAliveTest extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MinaService service;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service.stop();
+      service = null;
+   }
+
+   public void testKeepAliveWithServerNotResponding() throws Exception
+   {
+      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+
+      // server does not send ping
+      expect(factory.ping()).andStubReturn(null);
+      // no pong -> server is not responding
+      expect(factory.pong()).andReturn(null).atLeastOnce();
+
+      replay(factory);
+      
+      service = new MinaService(TCP, "localhost", PORT, factory);
+      Map<String, String> parameters = new HashMap<String, String>();
+      parameters.put(KEEP_ALIVE_INTERVAL_KEY, Integer.toString(KEEP_ALIVE_INTERVAL));
+      parameters.put(KEEP_ALIVE_TIMEOUT_KEY, Integer.toString(KEEP_ALIVE_TIMEOUT));
+      service.setParameters(parameters);
+      service.start();
+
+      MinaConnector connector = new MinaConnector(service.getLocator());
+      final String[] sessionIDNotResponding = new String[1];
+      final CountDownLatch latch = new CountDownLatch(1);
+ 
+      connector.setConnectionExceptionListener(new ConnectionExceptionListener()
+      {
+         public void handleConnectionException(Exception e, String sessionID)
+         {
+            sessionIDNotResponding[0] = sessionID;
+            latch.countDown();
+         }
+      });
+      
+      NIOSession session = connector.connect();
+
+      boolean firedKeepAliveNotification = latch.await(KEEP_ALIVE_INTERVAL
+            + KEEP_ALIVE_TIMEOUT + 1, SECONDS);
+      assertTrue(firedKeepAliveNotification);
+      assertEquals(session.getID(), sessionIDNotResponding[0]);
+      
+      connector.disconnect();
+      
+      verify(factory);
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
\ No newline at end of file

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/TestSupport.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -18,10 +18,14 @@
 
    public static final int MANY_MESSAGES = 500;
 
-   // Attributes ----------------------------------------------------
+   public static final int KEEP_ALIVE_INTERVAL = 2; // in seconds
 
+   public static final int KEEP_ALIVE_TIMEOUT = 1; // in seconds
+
    public static final int PORT = 9090;
 
+   // Attributes ----------------------------------------------------
+
    // Static --------------------------------------------------------
 
    public static String reverse(String text)

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -53,7 +53,7 @@
    @Override
    protected void setUp() throws Exception
    {
-      service = new MinaService(TCP, "localhost", PORT);
+      service = new MinaService(TCP.toString(), "localhost", PORT);
       service.start();
       connector = new MinaConnector(TCP, "localhost", PORT);
       
@@ -88,7 +88,6 @@
       
       byte[] payloadBytes = generatePayload(PAYLOAD);
       AbstractPacket packet = new BytesPacket(payloadBytes);
-      packet.setVersion((byte) 19);
       packet.setTargetID(handlerID);
 
       long start = System.currentTimeMillis();

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-01-28 13:04:56 UTC (rev 3635)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -6,6 +6,7 @@
  */
 package org.jboss.messaging.core.remoting.wireformat.test.unit;
 
+import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.BOOLEAN_LENGTH;
 import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.FALSE;
 import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.INT_LENGTH;
 import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.LONG_LENGTH;
@@ -17,19 +18,28 @@
 import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.NULL_STRING;
 import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.UTF_8_ENCODER;
 import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.BYTES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCEL;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_COMMIT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELIVERMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_JMSEXCEPTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ROLLBACK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETSESSIONID;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_HASNEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_NEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_NEXTMESSAGEBLOCK;
@@ -51,6 +61,7 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
 import static org.jboss.messaging.core.remoting.wireformat.test.unit.CodecAssert.assertEqualsByteArrays;
 import static org.jboss.messaging.test.unit.RandomUtil.randomByte;
+import static org.jboss.messaging.test.unit.RandomUtil.randomBytes;
 import static org.jboss.messaging.test.unit.RandomUtil.randomLong;
 import static org.jboss.messaging.test.unit.RandomUtil.randomString;
 
@@ -77,6 +88,7 @@
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockResponseCodec;
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
 import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionRequestCodec;
 import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionResponseCodec;
 import org.jboss.messaging.core.remoting.codec.ConsumerChangeRateMessageCodec;
@@ -93,8 +105,11 @@
 import org.jboss.messaging.core.remoting.codec.GetClientIDResponseCodec;
 import org.jboss.messaging.core.remoting.codec.JMSExceptionMessageCodec;
 import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.codec.SessionAcknowledgeMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCancelMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SetClientIDMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SetSessionIDMessageCodec;
 import org.jboss.messaging.core.remoting.codec.TextPacketCodec;
 import org.jboss.messaging.core.remoting.codec.UnsubscribeMessageCodec;
 import org.jboss.messaging.core.remoting.impl.mina.PacketCodecFactory;
@@ -108,6 +123,7 @@
 import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
 import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
 import org.jboss.messaging.core.remoting.wireformat.BrowserResetMessage;
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingMessage;
 import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
@@ -128,12 +144,21 @@
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.Ping;
+import org.jboss.messaging.core.remoting.wireformat.Pong;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionRecoverMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
 import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+import org.jboss.messaging.util.Logger;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -145,53 +170,46 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PacketTypeTest.class);
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
 
-   private static void addVersion(AbstractPacket packet)
-   {
-      byte version = (byte) 19;
-      packet.setVersion(version);
-   }
-
    private static ByteBuffer encode(int length, Object... args)
    {
       ByteBuffer buffer = ByteBuffer.allocate(length);
       for (Object arg : args)
       {
          if (arg instanceof Byte)
-            buffer.put(((Byte)arg).byteValue());
+            buffer.put(((Byte) arg).byteValue());
          else if (arg instanceof Boolean)
          {
             Boolean bool = (Boolean) arg;
             buffer.put(bool ? TRUE : FALSE);
-         }
-         else if (arg instanceof Integer)
-            buffer.putInt(((Integer)arg).intValue());
+         } else if (arg instanceof Integer)
+            buffer.putInt(((Integer) arg).intValue());
          else if (arg instanceof Long)
-            buffer.putLong(((Long)arg).longValue());
+            buffer.putLong(((Long) arg).longValue());
          else if (arg instanceof Float)
-            buffer.putFloat(((Float)arg).floatValue());
+            buffer.putFloat(((Float) arg).floatValue());
          else if (arg instanceof String)
-            putNullableString((String)arg, buffer);
+            putNullableString((String) arg, buffer);
          else if (arg == null)
             putNullableString(null, buffer);
          else if (arg instanceof byte[])
          {
-            byte[] b = (byte[])arg;
+            byte[] b = (byte[]) arg;
             buffer.putInt(b.length);
             buffer.put(b);
-         }
-         else if (arg instanceof long[])
+         } else if (arg instanceof long[])
          {
-            long[] longs = (long[])arg;
+            long[] longs = (long[]) arg;
             for (long l : longs)
             {
                buffer.putLong(l);
             }
-         }
-         else
+         } else
          {
             fail("no encoding defined for " + arg);
          }
@@ -205,7 +223,7 @@
       if (string == null)
       {
          buffer.put(NULL_STRING);
-      } else 
+      } else
       {
          buffer.put(NOT_NULL_STRING);
          UTF_8_ENCODER.reset();
@@ -220,7 +238,6 @@
       checkHeaderBytes(packet, buffer.buffer().buf());
 
       assertEquals(buffer.get(), packet.getType().byteValue());
-      assertEquals(buffer.get(), packet.getVersion());
 
       String targetID = packet.getTargetID();
       if (NO_ID_SET.equals(packet.getTargetID()))
@@ -229,7 +246,8 @@
       if (NO_ID_SET.equals(packet.getCallbackID()))
          callbackID = null;
 
-      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID);
+      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+            + BOOLEAN_LENGTH;
       assertEquals(buffer.getInt(), headerLength);
       assertEquals(buffer.getLong(), packet.getCorrelationID());
 
@@ -239,28 +257,35 @@
       String bufferCallbackID = buffer.getNullableString();
       if (bufferCallbackID == null)
          bufferCallbackID = NO_ID_SET;
+      boolean oneWay = buffer.getBoolean();
 
       assertEquals(bufferTargetID, packet.getTargetID());
       assertEquals(bufferCallbackID, packet.getCallbackID());
+      assertEquals(oneWay, packet.isOneWay());
    }
 
    private static void checkHeaderBytes(AbstractPacket packet, ByteBuffer actual)
    {
-      String targetID = (packet.getTargetID().equals(NO_ID_SET)? null : packet.getTargetID());
-      String callbackID = (packet.getCallbackID().equals(NO_ID_SET)? null : packet.getCallbackID());
+      String targetID = (packet.getTargetID().equals(NO_ID_SET) ? null : packet
+            .getTargetID());
+      String callbackID = (packet.getCallbackID().equals(NO_ID_SET) ? null
+            : packet.getCallbackID());
 
-      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID);
-      ByteBuffer expected = ByteBuffer.allocate(1 + 1 + INT_LENGTH + headerLength);
+      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+            + BOOLEAN_LENGTH;
+      ByteBuffer expected = ByteBuffer.allocate(1 + 1 + INT_LENGTH
+            + headerLength);
       expected.put(packet.getType().byteValue());
-      expected.put(packet.getVersion());
-      
+
       expected.putInt(headerLength);
       expected.putLong(packet.getCorrelationID());
       putNullableString(targetID, expected);
       putNullableString(callbackID, expected);
+      expected.put(packet.isOneWay() ? TRUE : FALSE);
       expected.flip();
 
-      assertEqualsByteArrays(expected.remaining(), expected.array(), actual.array());
+      assertEqualsByteArrays(expected.remaining(), expected.array(), actual
+            .array());
    }
 
    private static void checkBodyIsEmpty(RemotingBuffer buffer)
@@ -287,7 +312,6 @@
    public void testNullPacket() throws Exception
    {
       NullPacket packet = new NullPacket();
-      packet.setVersion(randomByte());
       packet.setCallbackID(randomString());
       packet.setCorrelationID(randomLong());
       packet.setTargetID(randomString());
@@ -305,7 +329,6 @@
       NullPacket p = (NullPacket) decodedPacket;
 
       assertEquals(NULL, p.getType());
-      assertEquals(packet.getVersion(), p.getVersion());
       assertEquals(packet.getCallbackID(), p.getCallbackID());
       assertEquals(packet.getCorrelationID(), p.getCorrelationID());
       assertEquals(packet.getTargetID(), p.getTargetID());
@@ -316,7 +339,6 @@
       JMSException e = new InvalidDestinationException(
             "testJMSExceptionMessage");
       JMSExceptionMessage message = new JMSExceptionMessage(e);
-      addVersion(message);
 
       AbstractPacketCodec<JMSExceptionMessage> codec = new JMSExceptionMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
@@ -334,15 +356,48 @@
             .getException().getMessage());
    }
 
+   public void testPing() throws Exception
+   {
+      Ping packet = new Ping();
+      AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
+            .createCodecForEmptyPacket(PING, Ping.class);
+
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      checkBodyIsEmpty(buffer);
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof Ping);
+      assertEquals(PING, decodedPacket.getType());
+   }
+
+   public void testPong() throws Exception
+   {
+      Pong packet = new Pong();
+      AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
+            .createCodecForEmptyPacket(PONG, Pong.class);
+
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      checkBodyIsEmpty(buffer);
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof Pong);
+      assertEquals(PONG, decodedPacket.getType());
+   }
+
    public void testTextPacket() throws Exception
    {
       TextPacket packet = new TextPacket("testTextPacket");
-      addVersion(packet);
       AbstractPacketCodec<TextPacket> codec = new TextPacketCodec();
 
       SimpleRemotingBuffer buffer = encode(packet, codec);
       checkHeader(buffer, packet);
-      checkBody(buffer, packet.getText());      
+      checkBody(buffer, packet.getText());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -353,7 +408,44 @@
       assertEquals(TEXT, p.getType());
       assertEquals(packet.getText(), p.getText());
    }
+   
+   public void testBytesPacket() throws Exception
+   {
+      BytesPacket packet = new BytesPacket(randomBytes());
 
+      AbstractPacketCodec codec = new BytesPacketCodec();
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      checkBody(buffer, packet.getBytes());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof BytesPacket);
+      BytesPacket p = (BytesPacket) decodedPacket;
+
+      assertEquals(BYTES, p.getType());
+      assertEqualsByteArrays(packet.getBytes(), p.getBytes());
+   }
+
+   public void testSetSessionIDMessage() throws Exception
+   {
+      SetSessionIDMessage message = new SetSessionIDMessage(randomString());
+
+      AbstractPacketCodec codec = new SetSessionIDMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      checkBody(buffer, message.getSessionID());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SetSessionIDMessage);
+      SetSessionIDMessage decodedMessage = (SetSessionIDMessage) decodedPacket;
+      assertEquals(MSG_SETSESSIONID, decodedMessage.getType());
+      assertEquals(message.getSessionID(), decodedMessage.getSessionID());
+   }
+   
    public void testCreateConnectionRequest() throws Exception
    {
       byte version = randomByte();
@@ -366,13 +458,14 @@
       String clientID = null;
 
       CreateConnectionRequest request = new CreateConnectionRequest(version,
-            remotingSessionID, clientVMID, username, password, prefetchSize, dupsOkBatchSize, null);
-      addVersion(request);
-      
+            remotingSessionID, clientVMID, username, password, prefetchSize,
+            dupsOkBatchSize, null);
+
       AbstractPacketCodec<CreateConnectionRequest> codec = new ConnectionFactoryCreateConnectionRequestCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
-      checkBody(buffer, version, remotingSessionID, clientVMID, username, password, prefetchSize, dupsOkBatchSize, clientID);      
+      checkBody(buffer, version, remotingSessionID, clientVMID, username,
+            password, prefetchSize, dupsOkBatchSize, clientID);
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -388,13 +481,12 @@
       assertEquals(request.getUsername(), decodedRequest.getUsername());
       assertEquals(request.getPassword(), decodedRequest.getPassword());
    }
-   
+
    public void testCreateConnectionResponse() throws Exception
    {
       CreateConnectionResponse response = new CreateConnectionResponse(
             randomString());
-      addVersion(response);
-      
+
       AbstractPacketCodec<CreateConnectionResponse> codec = new ConnectionFactoryCreateConnectionResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -409,16 +501,16 @@
       assertEquals(response.getConnectionID(), decodedResponse
             .getConnectionID());
    }
-   
+
    public void testCreateSessionRequest() throws Exception
    {
       CreateSessionRequest request = new CreateSessionRequest(true, 0, false);
-      addVersion(request);
-      
+
       AbstractPacketCodec codec = new CreateSessionRequestCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
-      checkBody(buffer, request.isTransacted(), request.getAcknowledgementMode(), request.isXA());
+      checkBody(buffer, request.isTransacted(), request
+            .getAcknowledgementMode(), request.isXA());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -436,8 +528,7 @@
    {
       CreateSessionResponse response = new CreateSessionResponse(
             randomString(), 23);
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new CreateSessionResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -457,8 +548,7 @@
    public void testSendMessage() throws Exception
    {
       SessionSendMessage packet = new SessionSendMessage(new MessageImpl());
-      addVersion(packet);
-      
+
       AbstractPacketCodec codec = new SessionSendMessageCodec();
       SimpleRemotingBuffer buffer = encode(packet, codec);
       checkHeader(buffer, packet);
@@ -476,16 +566,17 @@
 
    public void testCreateConsumerRequest() throws Exception
    {
-      Destination destination = new DestinationImpl(DestinationType.QUEUE, "testCreateConsumerRequest", false);
+      Destination destination = new DestinationImpl(DestinationType.QUEUE,
+            "testCreateConsumerRequest", false);
       CreateConsumerRequest request = new CreateConsumerRequest(destination,
             "color = 'red'", false, "subscription", false);
-      addVersion(request);
-      
+
       AbstractPacketCodec codec = new CreateConsumerRequestCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
-      checkBody(buffer, AbstractPacketCodec.encode(destination), request.getSelector(),
-            request.isNoLocal(), request.getSubscriptionName(), request.isConnectionConsumer());
+      checkBody(buffer, AbstractPacketCodec.encode(destination), request
+            .getSelector(), request.isNoLocal(), request.getSubscriptionName(),
+            request.isConnectionConsumer());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -499,15 +590,15 @@
       assertEquals(request.getSubscriptionName(), decodedRequest
             .getSubscriptionName());
       assertEquals(request.isConnectionConsumer(), decodedRequest
-            .isConnectionConsumer());;
+            .isConnectionConsumer());
+      ;
    }
 
    public void testCreateDestinationRequest() throws Exception
    {
       CreateDestinationRequest request = new CreateDestinationRequest(
             "testCreateDestinationRequest", false);
-      addVersion(request);
-      
+
       AbstractPacketCodec codec = new CreateDestinationRequestCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
@@ -529,8 +620,7 @@
 
       CreateDestinationResponse response = new CreateDestinationResponse(
             destination);
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new CreateDestinationResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -548,12 +638,12 @@
 
    public void testCreateDestinationResponseForTopic() throws Exception
    {
-      JBossTopic destination = new JBossTopic("testCreateDestinationResponseForTopic");
+      JBossTopic destination = new JBossTopic(
+            "testCreateDestinationResponseForTopic");
 
       CreateDestinationResponse response = new CreateDestinationResponse(
             destination);
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new CreateDestinationResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -574,8 +664,7 @@
 
       CreateConsumerResponse response = new CreateConsumerResponse(
             randomString(), 23, 42, randomLong());
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new CreateConsumerResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -598,7 +687,6 @@
    public void testStartConnectionMessage() throws Exception
    {
       StartConnectionMessage packet = new StartConnectionMessage();
-      addVersion(packet);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_STARTCONNECTION, StartConnectionMessage.class);
@@ -616,7 +704,6 @@
    public void testStopConnectionMessage() throws Exception
    {
       StopConnectionMessage packet = new StopConnectionMessage();
-      addVersion(packet);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_STOPCONNECTION, StopConnectionMessage.class);
@@ -634,7 +721,6 @@
    public void testChangeRateMessage() throws Exception
    {
       ConsumerChangeRateMessage message = new ConsumerChangeRateMessage(0.63f);
-      addVersion(message);
       AbstractPacketCodec codec = new ConsumerChangeRateMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
@@ -653,13 +739,12 @@
    {
       Message msg = new MessageImpl();
       DeliverMessage message = new DeliverMessage(msg, randomLong(), 23);
-      addVersion(message);
-      
+
       AbstractPacketCodec codec = new DeliverMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
-      checkBody(buffer, encodeMessage(msg),
-            message.getDeliveryID(), message.getDeliveryCount());
+      checkBody(buffer, encodeMessage(msg), message.getDeliveryID(), message
+            .getDeliveryCount());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -674,31 +759,122 @@
             .getDeliveryCount());
    }
 
+   public void testSessionAcknowledgeMessage() throws Exception
+   {
+      SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(
+            randomLong(), true);
 
+      AbstractPacketCodec codec = new SessionAcknowledgeMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      checkBody(buffer, message.getDeliveryID(), message.isAllUpTo());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionAcknowledgeMessage);
+      SessionAcknowledgeMessage decodedMessage = (SessionAcknowledgeMessage) decodedPacket;
+      assertEquals(MSG_ACKNOWLEDGE, decodedMessage.getType());
+      assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
+      assertEquals(message.isAllUpTo(), decodedMessage.isAllUpTo());
+   }
+
+   public void testSessionCancelMessage() throws Exception
+   {
+      SessionCancelMessage message = new SessionCancelMessage(randomLong(),
+            true);
+
+      AbstractPacketCodec codec = new SessionCancelMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      checkBody(buffer, message.getDeliveryID(), message.isExpired());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionCancelMessage);
+      SessionCancelMessage decodedMessage = (SessionCancelMessage) decodedPacket;
+      assertEquals(MSG_CANCEL, decodedMessage.getType());
+      assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
+      assertEquals(message.isExpired(), decodedMessage.isExpired());
+   }
+
+   public void testSessionCommitMessage() throws Exception
+   {
+      SessionCommitMessage message = new SessionCommitMessage();
+
+      AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+            MSG_COMMIT, SessionCommitMessage.class);
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      checkBodyIsEmpty(buffer);
+
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionCommitMessage);
+      assertEquals(MSG_COMMIT, decodedPacket.getType());
+   }
+
+   public void testSessionRollbackMessage() throws Exception
+   {
+      SessionRollbackMessage message = new SessionRollbackMessage();
+
+      AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+            MSG_ROLLBACK, SessionRollbackMessage.class);
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      checkBodyIsEmpty(buffer);
+
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionRollbackMessage);
+      assertEquals(MSG_ROLLBACK, decodedPacket.getType());
+   }
+   
+   public void testSessionRecoverMessage() throws Exception
+   {
+      SessionRecoverMessage message = new SessionRecoverMessage();
+
+      AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
+            MSG_RECOVER, SessionRecoverMessage.class);
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      checkBodyIsEmpty(buffer);
+
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionRecoverMessage);
+      assertEquals(MSG_RECOVER, decodedPacket.getType());
+   }
+
    public void testClosingMessage() throws Exception
    {
       ClosingMessage request = new ClosingMessage();
-      addVersion(request);
-      
-      AbstractPacketCodec codec =  PacketCodecFactory.createCodecForEmptyPacket(
+
+      AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             PacketType.MSG_CLOSING, ClosingMessage.class);
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
       checkBodyIsEmpty(buffer);
+
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof ClosingMessage);
       ClosingMessage decodedRequest = (ClosingMessage) decodedPacket;
-      assertEquals(PacketType.MSG_CLOSING, decodedRequest.getType());     
+      assertEquals(PacketType.MSG_CLOSING, decodedRequest.getType());
    }
 
-
    public void testCloseMessage() throws Exception
    {
       CloseMessage message = new CloseMessage();
-      addVersion(message);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_CLOSE, CloseMessage.class);
@@ -714,11 +890,9 @@
       assertEquals(MSG_CLOSE, decodedMessage.getType());
    }
 
-
    public void testGetClientIDRequest() throws Exception
    {
       GetClientIDRequest request = new GetClientIDRequest();
-      addVersion(request);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_GETCLIENTID, GetClientIDRequest.class);
@@ -736,8 +910,7 @@
    public void testGetClientIDResponse() throws Exception
    {
       GetClientIDResponse response = new GetClientIDResponse(randomString());
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new GetClientIDResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -755,8 +928,7 @@
    public void testSetClientIDMessage() throws Exception
    {
       SetClientIDMessage message = new SetClientIDMessage(randomString());
-      addVersion(message);
-      
+
       AbstractPacketCodec codec = new SetClientIDMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
@@ -770,18 +942,19 @@
       assertEquals(MSG_SETCLIENTID, decodedMessage.getType());
       assertEquals(message.getClientID(), decodedMessage.getClientID());
    }
- 
+
    public void testCreateBrowserRequest() throws Exception
    {
-      Destination destination = new DestinationImpl(DestinationType.QUEUE, "testCreateBrowserRequest", false);
+      Destination destination = new DestinationImpl(DestinationType.QUEUE,
+            "testCreateBrowserRequest", false);
       CreateBrowserRequest request = new CreateBrowserRequest(destination,
             "color = 'red'");
-      addVersion(request);
-      
+
       AbstractPacketCodec codec = new CreateBrowserRequestCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
-      checkBody(buffer, AbstractPacketCodec.encode(destination), request.getSelector());
+      checkBody(buffer, AbstractPacketCodec.encode(destination), request
+            .getSelector());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -796,8 +969,7 @@
    public void testCreateBrowserResponse() throws Exception
    {
       CreateBrowserResponse response = new CreateBrowserResponse(randomString());
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new CreateBrowserResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -815,7 +987,6 @@
    public void testBrowserResetMessage() throws Exception
    {
       BrowserResetMessage message = new BrowserResetMessage();
-      addVersion(message);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_BROWSER_RESET, BrowserResetMessage.class);
@@ -833,7 +1004,6 @@
    public void testBrowserHasNextMessageRequest() throws Exception
    {
       BrowserHasNextMessageRequest request = new BrowserHasNextMessageRequest();
-      addVersion(request);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_BROWSER_HASNEXTMESSAGE, BrowserHasNextMessageRequest.class);
@@ -852,7 +1022,6 @@
    {
       BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(
             false);
-      addVersion(response);
       AbstractPacketCodec codec = new BrowserHasNextMessageResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -870,7 +1039,6 @@
    public void testBrowserNextMessageRequest() throws Exception
    {
       BrowserNextMessageRequest request = new BrowserNextMessageRequest();
-      addVersion(request);
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_BROWSER_NEXTMESSAGE, BrowserNextMessageRequest.class);
@@ -889,8 +1057,7 @@
    {
       Message msg = new MessageImpl();
       BrowserNextMessageResponse response = new BrowserNextMessageResponse(msg);
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new BrowserNextMessageResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
@@ -910,8 +1077,7 @@
    {
       BrowserNextMessageBlockRequest request = new BrowserNextMessageBlockRequest(
             randomLong());
-      addVersion(request);
-      
+
       AbstractPacketCodec codec = new BrowserNextMessageBlockRequestCodec();
       SimpleRemotingBuffer buffer = encode(request, codec);
       checkHeader(buffer, request);
@@ -931,12 +1097,12 @@
       Message[] messages = new Message[] { new MessageImpl(), new MessageImpl() };
       BrowserNextMessageBlockResponse response = new BrowserNextMessageBlockResponse(
             messages);
-      addVersion(response);
-      
+
       AbstractPacketCodec codec = new BrowserNextMessageBlockResponseCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
-      checkBody(buffer, messages.length, BrowserNextMessageBlockResponseCodec.encode(messages));
+      checkBody(buffer, messages.length, BrowserNextMessageBlockResponseCodec
+            .encode(messages));
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -954,8 +1120,7 @@
    {
       UnsubscribeMessage message = new UnsubscribeMessage(
             "testUnsubscribeMessage");
-      addVersion(message);
-      
+
       AbstractPacketCodec codec = new UnsubscribeMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
@@ -973,11 +1138,11 @@
 
    public void testAddTemporaryDestinationMessage() throws Exception
    {
-      Destination destination = new DestinationImpl(DestinationType.QUEUE, "testAddTemporaryDestinationMessage", false);
+      Destination destination = new DestinationImpl(DestinationType.QUEUE,
+            "testAddTemporaryDestinationMessage", false);
       AddTemporaryDestinationMessage message = new AddTemporaryDestinationMessage(
             destination);
-      addVersion(message);
-      
+
       AbstractPacketCodec codec = new AddTemporaryDestinationMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
@@ -994,11 +1159,12 @@
 
    public void testDeleteTemporaryDestinationMessage() throws Exception
    {
-      Destination destination = new DestinationImpl(DestinationType.QUEUE, "testDeleteTemporaryDestinationMessage", false);;
+      Destination destination = new DestinationImpl(DestinationType.QUEUE,
+            "testDeleteTemporaryDestinationMessage", false);
+      ;
       DeleteTemporaryDestinationMessage message = new DeleteTemporaryDestinationMessage(
             destination);
-      addVersion(message);
-      
+
       AbstractPacketCodec codec = new DeleteTemporaryDestinationMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
@@ -1021,6 +1187,8 @@
    private SimpleRemotingBuffer encode(AbstractPacket packet,
          AbstractPacketCodec codec) throws Exception
    {
+      log.debug("encode " + packet);
+
       IoBuffer b = IoBuffer.allocate(256);
       b.setAutoExpand(true);
 

Added: trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/UnresponsiveServerTest.java	2008-01-28 14:35:41 UTC (rev 3636)
@@ -0,0 +1,138 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.crash;
+
+import static java.lang.Boolean.TRUE;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.DISABLE_INVM_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_INTERVAL_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.MinaService.KEEP_ALIVE_TIMEOUT_KEY;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_INTERVAL;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.KEEP_ALIVE_TIMEOUT;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
+import org.jboss.test.messaging.jms.JMSTestCase;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class UnresponsiveServerTest extends JMSTestCase
+{
+   // Constants -----------------------------------------------------
+
+   private MinaService minaService;
+   private Map<String, String> originalParameters;
+
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public UnresponsiveServerTest(String name)
+   {
+      super(name);
+   }
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+         
+      minaService = servers.get(0).getMessagingServer().getMinaService();
+      originalParameters = new HashMap<String, String>(minaService.getLocator().getParameters());
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      minaService.stop();
+      minaService.setParameters(originalParameters);
+      minaService.setKeepAliveFactory(new ServerKeepAliveFactory());
+      minaService.start();
+
+      super.tearDown();
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testExceptionListenerWhenServerIsUnresponsive()
+         throws Exception
+   {
+      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
+      // server does not send ping
+      expect(factory.ping()).andStubReturn(null);
+      // no pong -> server is not responding
+      expect(factory.pong()).andReturn(null).atLeastOnce();
+
+      ExceptionListener listener = createMock(ExceptionListener.class);
+      listener.onException(isA(JMSException.class));
+      expectLastCall().once();
+
+      replay(listener, factory);
+      
+      minaService.stop();
+      Map<String, String> parameters = new HashMap<String, String>();
+      parameters.put(KEEP_ALIVE_INTERVAL_KEY, Integer.toString(KEEP_ALIVE_INTERVAL));
+      parameters.put(KEEP_ALIVE_TIMEOUT_KEY, Integer.toString(KEEP_ALIVE_TIMEOUT));
+      parameters.put(DISABLE_INVM_KEY, TRUE.toString());
+      minaService.setParameters(parameters);
+      minaService.setKeepAliveFactory(factory);
+      minaService.start();
+      
+      QueueConnection conn = getConnectionFactory().createQueueConnection();
+      conn.setExceptionListener(listener);
+
+      // FIXME should deduce them from MinaConnector somehow...
+      Thread.sleep((5 + 10 + 1) * 1000);
+      
+      verify(listener, factory);
+      
+      conn.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list