[jboss-cvs] JBoss Messaging SVN: r5066 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 3 14:27:15 EDT 2008


Author: timfox
Date: 2008-10-03 14:27:14 -0400 (Fri, 03 Oct 2008)
New Revision: 5066

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
Log:
More session replication and failover


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -51,7 +51,7 @@
    {     
       this.clientSession = clientSesssion;
    }
-
+      
    public void handlePacket(final Packet packet)
    {
       byte type = packet.getType();
@@ -97,4 +97,8 @@
          log.error("Failed to handle packet", e);
       }
    }
+   
+   public void rehandlePacket(final Packet packet)
+   {      
+   }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -30,4 +30,6 @@
 public interface ChannelHandler
 {
    void handlePacket(Packet packet);
+      
+   //void rehandlePacket(Packet packet);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -50,4 +50,10 @@
    boolean isReplicateBlocking();
    
    boolean isWriteAlways();
+   
+   boolean isReHandleResponseOnFailure();
+   
+   boolean isDuplicate();
+   
+   void setDuplicate(boolean duplicate);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -95,6 +95,7 @@
 import org.jboss.messaging.core.remoting.ResponseNotifier;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
@@ -778,7 +779,7 @@
          }
          case NULL_RESPONSE:
          {
-            packet = new NullResponseMessage(false);
+            packet = new NullResponseMessage();
             break;
          }
          case SESS_MANAGEMENT_SEND:
@@ -837,6 +838,10 @@
       private Thread blockThread;
 
       private ResponseNotifier responseNotifier;
+      
+      private volatile boolean justFailedOver;
+      
+      private volatile Packet lastPacketReceived;
 
       private ChannelImpl(final RemotingConnectionImpl connection,
                           final long id,
@@ -895,8 +900,7 @@
       }
 
       public int getLastReceivedCommandID()
-      {
-         //log.info("getting last received command id, last received packet is " + this.lastReceivedPacket);
+      {        
          return lastReceivedCommandID;
       }
 
@@ -913,13 +917,15 @@
                try
                {
                   addToCache(packet);                  
+                  
+                  lastResponseSent = packet;               
+                  
+                  connection.doWrite(packet);              
                }
                finally
                {
                   lock.unlock();
-               }
-                  
-               connection.doWrite(packet);               
+               }                                 
             }
          }
       }
@@ -938,83 +944,83 @@
       // This must never called by more than one thread concurrently
       public Packet sendBlocking(final Packet packet, final ResponseNotifier notifier) throws MessagingException
       {
-         // For now we only allow one blocking request-response at a time per
-         // channel
-         // We can relax this but it will involve some kind of correlation id
-         synchronized (waitLock)
+         packet.setChannelID(id);
+
+         lock.lock();
+         try
          {
-            try
+            addToCache(packet);                  
+                  
+            // For now we only allow one blocking request-response at a time per
+            // channel
+            // We can relax this but it will involve some kind of correlation id
+            synchronized (waitLock)
             {
-               blockThread = Thread.currentThread();
-
-               responseNotifier = notifier;
-
-               response = null;
-
-               packet.setChannelID(id);
-
-               lock.lock();
                try
                {
-                  addToCache(packet);                  
-               }
-               finally
-               {
-                  lock.unlock();
-               }
-
-               connection.doWrite(packet);
-
-               long toWait = connection.blockingCallTimeout;
-
-               long start = System.currentTimeMillis();
-
-               while (response == null && toWait > 0)
-               {
-                  try
+                  blockThread = Thread.currentThread();
+   
+                  responseNotifier = notifier;
+   
+                  response = null;
+                 
+                  connection.doWrite(packet);
+   
+                  long toWait = connection.blockingCallTimeout;
+   
+                  long start = System.currentTimeMillis();
+   
+                  while (response == null && toWait > 0)
                   {
-                     waitLock.wait(toWait);
-                  }
-                  catch (final InterruptedException e)
-                  {
-                     if (interruptBlockOnFailure)
+                     try
                      {
-                        if (connection.destroyed)
+                        waitLock.wait(toWait);
+                     }
+                     catch (final InterruptedException e)
+                     {
+                        if (interruptBlockOnFailure)
                         {
-                           throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection failed");
+                           if (connection.destroyed)
+                           {
+                              throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection failed");
+                           }
                         }
                      }
+   
+                     final long now = System.currentTimeMillis();
+   
+                     toWait -= now - start;
+   
+                     start = now;
                   }
-
-                  final long now = System.currentTimeMillis();
-
-                  toWait -= now - start;
-
-                  start = now;
+   
+                  if (response == null)
+                  {
+                     throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                                  "Timed out waiting for response when sending packet " + packet.getType());
+                  }
+   
+                  if (response.getType() == PacketImpl.EXCEPTION)
+                  {
+                     final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+   
+                     throw mem.getException();
+                  }
+                  else
+                  {
+                     return response;
+                  }
                }
-
-               if (response == null)
+               finally
                {
-                  throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                               "Timed out waiting for response when sending packet " + packet.getType());
+                  blockThread = null;
                }
-
-               if (response.getType() == PacketImpl.EXCEPTION)
-               {
-                  final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
-
-                  throw mem.getException();
-               }
-               else
-               {
-                  return response;
-               }
             }
-            finally
-            {
-               blockThread = null;
-            }
          }
+         finally
+         {
+            lock.unlock();
+         }
       }
 
       public void replicatePacket(final Packet packet) throws MessagingException
@@ -1111,9 +1117,11 @@
             }
          }
       }
-
+      
+      private volatile Packet lastResponseSent;
+      
       public void transferConnection(final RemotingConnection newConnection)
-      {
+      {        
          // Needs to synchronize on the connection to make sure no packets from
          // the old connection
          // get processed after transfer has occurred
@@ -1132,6 +1140,21 @@
             connection = rnewConnection;
 
             replicatingChannel = null;
+            
+            justFailedOver = true;
+            
+            //Send back any blocking responses that may be required
+            
+            if (lastPacketReceived != null && lastPacketReceived.isReHandleResponseOnFailure())
+            {
+               if (lastResponseSent != null && lastResponseSent instanceof DuplicablePacket)
+               {
+                  lastResponseSent.setDuplicate(true);
+                  
+                  send(lastResponseSent);
+               }
+            }
+            
          }
       }
 
@@ -1157,6 +1180,8 @@
 
       private void handlePacket(final Packet packet)
       {
+         lastResponseSent = null;
+         
          if (packet.getType() == PACKETS_CONFIRMED)
          {
             if (resendCache != null)
@@ -1220,20 +1245,31 @@
                }
             }
 
+            lastPacketReceived = packet;
+            
             if (packet.isResponse())
-            {
-               synchronized (waitLock)
+            {              
+               if (packet.isDuplicate() && !justFailedOver)
                {
-                  response = packet;
-
-                  checkConfirmation(packet);
-
-                  if (responseNotifier != null)
+                  //Ignore it - duplicate packets can only come just after failover
+               }
+               else
+               {
+                  synchronized (waitLock)
                   {
-                     responseNotifier.onResponseReceived();
+                     response = packet;
+   
+                     checkConfirmation(packet);
+   
+                     if (responseNotifier != null)
+                     {
+                        responseNotifier.onResponseReceived();
+                     }
+   
+                     waitLock.notify();
                   }
-
-                  waitLock.notify();
+                  
+                  justFailedOver = false;
                }
             }
             else if (handler != null)
@@ -1393,7 +1429,5 @@
             throw new IllegalArgumentException("Invalid packet: " + packet);
          }
       }
-
    }
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -64,7 +64,7 @@
    {
       return true;
    }
- 
+   
    public int getServerVersion()
    {
       return serverVersion;

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A DuplicablePacket
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 3 Oct 2008 15:00:31
+ *
+ *
+ */
+public class DuplicablePacket extends PacketImpl
+{
+   private boolean duplicate;
+
+   public DuplicablePacket(final byte type)
+   {
+      super(type);
+   }
+
+   public boolean isDuplicate()
+   {
+      return duplicate;
+   }
+   
+   public void setDuplicate(final boolean duplicate)
+   {
+      this.duplicate = duplicate;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putBoolean(duplicate);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      duplicate = buffer.getBoolean();
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
@@ -32,7 +32,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class MessagingExceptionMessage extends PacketImpl
+public class MessagingExceptionMessage extends DuplicablePacket
 {
    // Constants -----------------------------------------------------
 
@@ -50,7 +50,7 @@
 
       this.exception = exception;
    }
-   
+
    public MessagingExceptionMessage()
    {
       super(EXCEPTION);
@@ -62,43 +62,45 @@
    {
       return true;
    }
-   
+
    public MessagingException getException()
    {
       return exception;
    }
-   
+
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putInt(exception.getCode());
       buffer.putNullableString(exception.getMessage());
    }
-   
+
    public void decodeBody(final MessagingBuffer buffer)
    {
+      super.decodeBody(buffer);
       int code = buffer.getInt();
       String msg = buffer.getNullableString();
       exception = new MessagingException(code, msg);
    }
-   
+
    @Override
    public String toString()
    {
       return getParentString() + ", exception= " + exception + "]";
    }
-   
+
    public boolean equals(Object other)
    {
       if (other instanceof MessagingExceptionMessage == false)
       {
          return false;
       }
-            
+
       MessagingExceptionMessage r = (MessagingExceptionMessage)other;
-      
+
       return super.equals(other) && this.exception.equals(r.exception);
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -12,22 +12,32 @@
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  */
-public class NullResponseMessage extends PacketImpl
+public class NullResponseMessage extends DuplicablePacket
 {
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
+   //This does not need to be written over the wire
    private final boolean writeAlways;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
+   public NullResponseMessage()
+   {
+      super(NULL_RESPONSE);
+
+      this.writeAlways = false;
+   }
+   
    public NullResponseMessage(final boolean writeAlways)
    {
       super(NULL_RESPONSE);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -232,6 +232,20 @@
    {
       return false;
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return false;
+   }
+   
+   public boolean isDuplicate()
+   {
+      return false;
+   }
+   
+   public void setDuplicate(final boolean duplicate)
+   {      
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -103,6 +103,11 @@
       return true;
    }
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    @Override
    public String toString()
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -64,6 +64,11 @@
       address = buffer.getSimpleString();
    }
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public boolean equals(Object other)
    {
       if (other instanceof SessionBindingQueryMessage == false)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -35,7 +35,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public class SessionBindingQueryResponseMessage extends PacketImpl
+public class SessionBindingQueryResponseMessage extends DuplicablePacket
 {
    private boolean exists;
    
@@ -72,6 +72,7 @@
    
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putBoolean(exists);
       buffer.putInt(queueNames.size());      
       for (SimpleString queueName: queueNames)
@@ -82,6 +83,7 @@
    
    public void decodeBody(final MessagingBuffer buffer)
    {
+      super.decodeBody(buffer);
       exists = buffer.getBoolean();      
       int numQueues = buffer.getInt();      
       queueNames = new ArrayList<SimpleString>(numQueues);      

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -69,6 +69,11 @@
    {
       browserID = buffer.getLong();
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -70,6 +70,11 @@
    {
       browserID = buffer.getLong();       
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -31,7 +31,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class SessionBrowserHasNextMessageResponseMessage extends PacketImpl
+public class SessionBrowserHasNextMessageResponseMessage extends DuplicablePacket
 {
    // Constants -----------------------------------------------------
 
@@ -69,11 +69,13 @@
    
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putBoolean(hasNext);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {
+      super.decodeBody(buffer);
       hasNext = buffer.getBoolean();       
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -70,6 +70,11 @@
    {
       browserID = buffer.getLong();       
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -71,6 +71,11 @@
    {
       browserID = buffer.getLong();       
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -34,6 +34,11 @@
 
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    @Override
    public boolean equals(final Object other)
    {
@@ -47,6 +52,8 @@
       return super.equals(other);
    }
    
+   
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -77,6 +77,11 @@
    {
       return true;
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,9 +58,14 @@
    {
       super(SESS_CREATEBROWSER);
    }
-
+   
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public SimpleString getQueueName()
    {
       return queueName;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -63,10 +63,15 @@
    public SessionCreateConsumerMessage()
    {
       super(SESS_CREATECONSUMER);   
-   }
+   }   
 
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    @Override
    public String toString()
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -30,7 +30,7 @@
  * 
  * @version <tt>$Revision$</tt>
  */
-public class SessionCreateConsumerResponseMessage extends PacketImpl
+public class SessionCreateConsumerResponseMessage extends DuplicablePacket
 {
    // Constants -----------------------------------------------------
 
@@ -68,11 +68,13 @@
    
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putInt(windowSize);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {
+      super.decodeBody(buffer);
       windowSize = buffer.getInt();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -69,6 +69,11 @@
 
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    @Override
    public String toString()
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -30,7 +30,7 @@
  * 
  * @version <tt>$Revision$</tt>
  */
-public class SessionCreateProducerResponseMessage extends PacketImpl
+public class SessionCreateProducerResponseMessage extends DuplicablePacket
 {
    // Constants -----------------------------------------------------
 
@@ -79,7 +79,6 @@
    	return maxRate;
    }
 
-
    public SimpleString getAutoGroupId()
    {
       return autoGroupId;
@@ -87,6 +86,7 @@
 
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putInt(initialCredits);
       buffer.putInt(maxRate);
       buffer.putNullableSimpleString(autoGroupId);
@@ -94,6 +94,7 @@
    
    public void decodeBody(final MessagingBuffer buffer)
    {     
+      super.decodeBody(buffer);
       initialCredits = buffer.getInt();
       maxRate = buffer.getInt();
       autoGroupId = buffer.getNullableSimpleString();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -128,6 +128,11 @@
       return true;
    }
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public boolean equals(Object other)
    {
       if (other instanceof SessionCreateQueueMessage == false)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -87,6 +87,11 @@
       return true;
    }
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public boolean equals(Object other)
    {
       if (other instanceof SessionDeleteQueueMessage == false)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -55,6 +55,11 @@
 
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public long getProducerID()
    {
       return producerID;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -31,7 +31,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public class SessionProducerFlowCreditMessage extends PacketImpl
+public class SessionProducerFlowCreditMessage extends DuplicablePacket
 {
    // Constants -----------------------------------------------------
 
@@ -73,12 +73,14 @@
    
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putLong(producerID);
       buffer.putInt(credits);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {
+      super.decodeBody(buffer);
       producerID = buffer.getLong();
       credits = buffer.getInt();
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -54,6 +54,11 @@
       return queueName;
    }
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putSimpleString(queueName);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
@@ -32,88 +32,94 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public class SessionQueueQueryResponseMessage extends PacketImpl
+public class SessionQueueQueryResponseMessage extends DuplicablePacket
 {
    private boolean exists;
-   
+
    private boolean durable;
-   
+
    private int consumerCount;
-   
+
    private int messageCount;
-   
+
    private SimpleString filterString;
-   
+
    private SimpleString address;
-   
-   public SessionQueueQueryResponseMessage(final boolean durable, 
-   		final int consumerCount, final int messageCount, final SimpleString filterString,
-   		final SimpleString address)
+
+   public SessionQueueQueryResponseMessage(final boolean durable,
+                                           final int consumerCount,
+                                           final int messageCount,
+                                           final SimpleString filterString,
+                                           final SimpleString address)
    {
-   	this(durable, consumerCount, messageCount, filterString, address, true);
+      this(durable, consumerCount, messageCount, filterString, address, true);
    }
-   
+
    public SessionQueueQueryResponseMessage()
    {
       this(false, 0, 0, null, null, false);
    }
-   
-   private SessionQueueQueryResponseMessage(final boolean durable, 
-   		final int consumerCount, final int messageCount, final SimpleString filterString, final SimpleString address,
-   		final boolean exists)
+
+   private SessionQueueQueryResponseMessage(final boolean durable,
+                                            final int consumerCount,
+                                            final int messageCount,
+                                            final SimpleString filterString,
+                                            final SimpleString address,
+                                            final boolean exists)
    {
       super(SESS_QUEUEQUERY_RESP);
-       
+
       this.durable = durable;
-      
+
       this.consumerCount = consumerCount;
-      
+
       this.messageCount = messageCount;
-      
+
       this.filterString = filterString;
-      
+
       this.address = address;
-      
-      this.exists = exists;      
+
+      this.exists = exists;
    }
-   
+
    public boolean isResponse()
    {
       return true;
    }
-      
+
    public boolean isExists()
    {
       return exists;
    }
-   
+
    public boolean isDurable()
    {
       return durable;
    }
-   
+
    public int getConsumerCount()
    {
       return consumerCount;
    }
-   
+
    public int getMessageCount()
    {
       return messageCount;
    }
-   
+
    public SimpleString getFilterString()
    {
       return filterString;
    }
-   
+
    public SimpleString getAddress()
    {
       return address;
    }
-   
+
    public void encodeBody(final MessagingBuffer buffer)
    {
+      super.encodeBody(buffer);
       buffer.putBoolean(exists);
       buffer.putBoolean(durable);
       buffer.putInt(consumerCount);
@@ -121,32 +127,34 @@
       buffer.putNullableSimpleString(filterString);
       buffer.putNullableSimpleString(address);
    }
-   
+
    public void decodeBody(final MessagingBuffer buffer)
    {
+      super.decodeBody(buffer);
       exists = buffer.getBoolean();
       durable = buffer.getBoolean();
       consumerCount = buffer.getInt();
       messageCount = buffer.getInt();
-      filterString  = buffer.getNullableSimpleString();
+      filterString = buffer.getNullableSimpleString();
       address = buffer.getNullableSimpleString();
    }
-   
+
    public boolean equals(Object other)
    {
       if (other instanceof SessionQueueQueryResponseMessage == false)
       {
          return false;
       }
-            
+
       SessionQueueQueryResponseMessage r = (SessionQueueQueryResponseMessage)other;
-      
+
       return super.equals(other) && this.exists == r.exists &&
-               this.durable == r.durable &&             
-               this.consumerCount == r.consumerCount &&
-               this.messageCount == r.messageCount &&
-               this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&
-               this.address == null ? r.address == null : this.address.equals(r.address);
+             this.durable == r.durable &&
+             this.consumerCount == r.consumerCount &&
+             this.messageCount == r.messageCount &&
+             this.filterString == null ? r.filterString == null
+                                      : this.filterString.equals(r.filterString) && this.address == null ? r.address == null
+                                                                                                        : this.address.equals(r.address);
    }
-   
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -91,6 +91,11 @@
    {      
       return true;
    }
+   
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
       
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -71,6 +71,11 @@
 
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public long getProducerID()
    {
       return producerID;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -72,6 +72,11 @@
 
    // Public --------------------------------------------------------
 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public long getProducerID()
    {
       return producerID;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -60,6 +60,11 @@
    }
 
    // Public --------------------------------------------------------
+ 
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
    
    public Xid getXid()
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -62,6 +62,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public boolean isFailed()
    {
       return failed;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public Xid getXid()
    {
       return xid;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -61,6 +61,11 @@
 
    // Public --------------------------------------------------------
  
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public boolean isResponse()
    {
       return true;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -55,9 +55,13 @@
       super(SESS_XA_GET_TIMEOUT_RESP);
    }
    
-
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public boolean isResponse()
    {
       return true;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public Xid getXid()
    {
       return xid;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public Xid getXid()
    {
       return xid;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -63,7 +63,7 @@
    }
 
    // Public --------------------------------------------------------
-   
+
    public boolean isResponse()
    {
       return true;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public Xid getXid()
    {
       return xid;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public Xid getXid()
    {
       return xid;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -55,9 +55,13 @@
       super(SESS_XA_SET_TIMEOUT);
    }
    
-
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public int getTimeoutSeconds()
    {
       return this.timeoutSeconds;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
 
    // Public --------------------------------------------------------
    
+   public boolean isReHandleResponseOnFailure()
+   {
+      return true;
+   }
+   
    public Xid getXid()
    {
       return xid;

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server;
 
@@ -39,28 +39,28 @@
  */
 public interface ServerSession
 {
-	long getID();
-	
-	String getUsername();
-	
-	String getPassword();
-	
-	void removeBrowser(ServerBrowserImpl browser) throws Exception;
-	
-	void removeConsumer(ServerConsumer consumer) throws Exception;
-	
-	void removeProducer(ServerProducer producer) throws Exception;
-	
-	void close() throws Exception;
-	
-	void setStarted(boolean started) throws Exception;
-	
-	void promptDelivery(Queue queue);
-	
-	void send(ServerMessage msg) throws Exception;
-		
-	void processed(final long consumerID, final long messageID) throws Exception;
-	
+   long getID();
+
+   String getUsername();
+
+   String getPassword();
+
+   void removeBrowser(ServerBrowserImpl browser) throws Exception;
+
+   void removeConsumer(ServerConsumer consumer) throws Exception;
+
+   void removeProducer(ServerProducer producer) throws Exception;
+
+   void close() throws Exception;
+
+   void setStarted(boolean started) throws Exception;
+
+   void promptDelivery(Queue queue);
+
+   void send(ServerMessage msg) throws Exception;
+
+   void processed(final long consumerID, final long messageID) throws Exception;
+
    void rollback() throws Exception;
 
    void commit() throws Exception;
@@ -82,7 +82,7 @@
    SessionXAResponseMessage XAStart(Xid xid);
 
    SessionXAResponseMessage XASuspend() throws Exception;
-
+   
    List<Xid> getInDoubtXids() throws Exception;
 
    int getXATimeout();
@@ -93,41 +93,59 @@
 
    void removeDestination(SimpleString address, boolean durable) throws Exception;
 
-   void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString,
-                    boolean durable, boolean temporary) throws Exception;
+   void createQueue(SimpleString address,
+                    SimpleString queueName,
+                    SimpleString filterString,
+                    boolean durable,
+                    boolean temporary) throws Exception;
 
    void deleteQueue(SimpleString queueName) throws Exception;
 
-   SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName, SimpleString filterString,
-   		                                              int windowSize, int maxRate) throws Exception;
-   
-   SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate, boolean autoGroupId) throws Exception;
+   SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
+                                                       SimpleString filterString,
+                                                       int windowSize,
+                                                       int maxRate) throws Exception;
 
+//   SessionCreateConsumerResponseMessage recreateConsumer(SimpleString queueName,
+//                                                         SimpleString filterString,
+//                                                         int windowSize,
+//                                                         int maxRate) throws Exception;
+
+   SessionCreateProducerResponseMessage createProducer(SimpleString address,
+                                                       int windowSize,
+                                                       int maxRate,
+                                                       boolean autoGroupId) throws Exception;
+
+//   SessionCreateProducerResponseMessage recreateProducer(SimpleString address,
+//                                                         int windowSize,
+//                                                         int maxRate,
+//                                                         boolean autoGroupId) throws Exception;
+
    SessionQueueQueryResponseMessage executeQueueQuery(SimpleString queueName) throws Exception;
 
    SessionBindingQueryResponseMessage executeBindingQuery(SimpleString address) throws Exception;
 
    void createBrowser(SimpleString queueName, SimpleString filterString) throws Exception;
-   
+
    void closeConsumer(long consumerID) throws Exception;
-   
+
    void closeProducer(long producerID) throws Exception;
-   
+
    void closeBrowser(long browserID) throws Exception;
-   
+
    void receiveConsumerCredits(long consumerID, int credits) throws Exception;
-   
+
    void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
-   
+
    boolean browserHasNextMessage(long browserID) throws Exception;
-   
+
    ServerMessage browserNextMessage(long browserID) throws Exception;
-   
+
    void browserReset(long browserID) throws Exception;
-   
+
    int transferConnection(RemotingConnection newConnection);
-   
+
    void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
-   
+
    void failedOver() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -118,4 +118,8 @@
 
       channel1.send(response);
    }
+//   
+//   public void rehandlePacket(final Packet packet)
+//   {      
+//   }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -463,7 +463,7 @@
 
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
-
+   
    public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
    {
       if (tx != null && tx.getXid().equals(xid))
@@ -504,7 +504,7 @@
 
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
-
+   
    public SessionXAResponseMessage XAForget(final Xid xid)
    {
       // Do nothing since we don't support heuristic commits / rollback from the
@@ -907,6 +907,34 @@
 
       return response;
    }
+   
+//   public SessionCreateConsumerResponseMessage recreateConsumer(final SimpleString queueName,
+//                                                              final SimpleString filterString,
+//                                                              int windowSize,
+//                                                              int maxRate) throws Exception
+//   {
+//      Binding binding = postOffice.getBinding(queueName);
+//
+//      if (binding == null)
+//      {
+//         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+//      }
+//
+//      securityStore.check(binding.getAddress(), CheckType.READ, this);
+//
+//      // Flow control values if specified on queue override those passed in from
+//      // client
+//
+//      QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
+//
+//      Integer queueWindowSize = qs.getConsumerWindowSize();
+//
+//      windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+//
+//      SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
+//
+//      return response;
+//   }
 
    public SessionQueueQueryResponseMessage executeQueueQuery(final SimpleString queueName) throws Exception
    {
@@ -1034,6 +1062,36 @@
       }
       return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
    }
+   
+//   public SessionCreateProducerResponseMessage recreateProducer(final SimpleString address,
+//                                                              final int windowSize,
+//                                                              final int maxRate,
+//                                                              final boolean autoGroupId) throws Exception
+//   {
+//      FlowController flowController = null;
+//
+//      final int maxRateToUse = maxRate;
+//
+//      if (address != null)
+//      {
+//         flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+//      }
+//
+//      final int windowToUse = flowController == null ? -1 : windowSize;
+//      
+//      // Get some initial credits to send to the producer - we try for
+//      // windowToUse
+//
+//      int initialCredits = flowController == null ? -1 : windowToUse;
+//
+//      SimpleString groupId = null;
+//      if(autoGroupId)
+//      {
+//         groupId = simpleStringIdGenerator.generateID();
+//      }
+//      
+//      return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
+//   }
 
    public boolean browserHasNextMessage(final long browserID) throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -51,6 +51,7 @@
 
 import java.util.List;
 
+import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.exception.MessagingException;
@@ -89,6 +90,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
@@ -425,4 +427,246 @@
          channel.send(response);
       }
    }
+   
+//   public void rehandlePacket(final Packet packet)
+//   {      
+//      Packet response = null;
+//      try
+//      {
+//         byte type = packet.getType();
+//                        
+//         switch (type)
+//         {
+//            case SESS_CREATECONSUMER:
+//            {
+//               SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
+//               response = session.recreateConsumer(request.getQueueName(),
+//                                                   request.getFilterString(),
+//                                                   request.getWindowSize(),
+//                                                   request.getMaxRate());
+//               break;
+//            }
+//            case SESS_CREATEQUEUE:
+//            {
+//               response = new NullResponseMessage(true, true);
+//               break;
+//            }
+//            case SESS_DELETE_QUEUE:
+//            {
+//               response = new NullResponseMessage(true, true);
+//               break;
+//            }
+//            case SESS_QUEUEQUERY:
+//            {
+//               SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
+//               response = session.executeQueueQuery(request.getQueueName());
+//               break;
+//            }
+//            case SESS_BINDINGQUERY:
+//            {
+//               SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
+//               response = session.executeBindingQuery(request.getAddress());
+//               break;
+//            }
+//            case SESS_CREATEBROWSER:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_CREATEPRODUCER:
+//            {
+//               SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
+//               response = session.recreateProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate(), request.isAutoGroupId());
+//               break;
+//            }
+//            case SESS_COMMIT:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_ROLLBACK:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_XA_COMMIT:
+//            {
+//               SessionXACommitMessage message = (SessionXACommitMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_END:
+//            {
+//               SessionXAEndMessage message = (SessionXAEndMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_FORGET:
+//            {
+//               SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_JOIN:
+//            {
+//               SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_RESUME:
+//            {
+//               SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_ROLLBACK:
+//            {
+//               SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_START:
+//            {
+//               SessionXAStartMessage message = (SessionXAStartMessage)packet;
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_SUSPEND:
+//            {
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_PREPARE:
+//            {
+//               SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
+//               //FIXME - what if the xa operation didn't succeed last time round?
+//               //Need to return an xa error code then.
+//               //So we need to store the state of the last operation
+//               
+//               ok ok it's simpler than this!!!
+//               
+//               no need for having reversions of everything.
+//               
+//               just store the last response in the server session packet hjandler and send back that
+//               
+//               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+//               break;
+//            }
+//            case SESS_XA_INDOUBT_XIDS:
+//            {
+//               List<Xid> xids = session.getInDoubtXids();
+//               response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+//               break;
+//            }
+//            case SESS_XA_GET_TIMEOUT:
+//            {
+//               response = new SessionXAGetTimeoutResponseMessage(session.getXATimeout());
+//               break;
+//            }
+//            case SESS_XA_SET_TIMEOUT:
+//            {
+//               SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
+//               response = new SessionXASetTimeoutResponseMessage(session.setXATimeout(message.getTimeoutSeconds()));
+//               break;
+//            }
+//            case SESS_ADD_DESTINATION:
+//            {
+//               response = new NullResponseMessage(true, true);
+//               break;
+//            }
+//            case SESS_REMOVE_DESTINATION:
+//            {
+//               response = new NullResponseMessage(true, true);
+//               break;
+//            }
+//            case SESS_STOP:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_CLOSE:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_CONSUMER_CLOSE:
+//            {
+//               response = new NullResponseMessage(true, true);
+//               break;
+//            }
+//            case SESS_PRODUCER_CLOSE:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_BROWSER_CLOSE:
+//            {
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            case SESS_SEND:
+//            {
+//               SessionSendMessage message = (SessionSendMessage)packet;            
+//               if (message.isRequiresResponse())
+//               {
+//                  response = new NullResponseMessage(false, true);
+//               }
+//               break;
+//            }
+//            case SESS_BROWSER_HASNEXTMESSAGE:
+//            {
+//               //FIXME - this won't work - but we're getting rid of these browser commands anyway so live
+//               //with it for now
+//               SessionBrowserHasNextMessageMessage message = (SessionBrowserHasNextMessageMessage)packet;
+//               response = new SessionBrowserHasNextMessageResponseMessage(session.browserHasNextMessage(message.getBrowserID()));
+//               break;
+//            }
+//            case SESS_BROWSER_NEXTMESSAGE:
+//            {
+//               //FIXME - this won't work - but we're getting rid of these browser commands anyway so live
+//               //with it for now
+//               SessionBrowserNextMessageMessage message = (SessionBrowserNextMessageMessage)packet;
+//               ServerMessage smsg = session.browserNextMessage(message.getBrowserID());
+//               response = new SessionBrowseMessage(smsg);
+//               break;
+//            }
+//            case SESS_BROWSER_RESET:
+//            {
+//               //FIXME - this won't work - but we're getting rid of these browser commands anyway so live
+//               //with it for now
+//               SessionBrowserResetMessage message = (SessionBrowserResetMessage)packet;
+//               session.browserReset(message.getBrowserID());
+//               response = new NullResponseMessage(false, true);
+//               break;
+//            }
+//            default:
+//            {
+//               response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+//                                                                               "Unsupported packet on resend " + type));
+//            }
+//         }
+//      }
+//      catch (Throwable t)
+//      {
+//         MessagingException me;
+//
+//         log.error("Caught unexpected exception", t);
+//
+//         if (t instanceof MessagingException)
+//         {
+//            me = (MessagingException)t;
+//         }
+//         else
+//         {
+//            me = new MessagingException(MessagingException.INTERNAL_ERROR);
+//         }
+//
+//         response = new MessagingExceptionMessage(me);
+//      }
+//      
+//      if (response != null)
+//      {
+//         channel.send(response);
+//      }
+//   }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java	2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java	2008-10-03 18:27:14 UTC (rev 5066)
@@ -121,8 +121,7 @@
          
          ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
                                                                 new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                                           backupParams));
-         
+                                                                                           backupParams));         
          do
          {         
             testA(sf);
@@ -133,6 +132,30 @@
       }
    }
    
+   public void testAA(final ClientSessionFactory sf) throws Exception
+   {                       
+      ClientSession s = sf.createSession(false, false, false, false);
+      
+      s.createQueue(ADDRESS, ADDRESS, null, false, false);
+      
+      failer.session = s;
+                              
+      final int numConsumers = 100;
+         
+      for (int i = 0; i < numConsumers; i++)
+      {
+         ClientConsumer consumer = s.createConsumer(ADDRESS);
+         
+         consumer.close();
+      }
+      
+      s.deleteQueue(ADDRESS);
+      
+      s.close();
+      
+      log.info("done");
+   }
+   
    public void testA(final ClientSessionFactory sf) throws Exception
    {      
       long start = System.currentTimeMillis();




More information about the jboss-cvs-commits mailing list