[jboss-cvs] JBoss Messaging SVN: r5162 - in trunk: src/main/org/jboss/messaging/core/remoting and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 21 06:39:10 EDT 2008


Author: timfox
Date: 2008-10-21 06:39:09 -0400 (Tue, 21 Oct 2008)
New Revision: 5162

Modified:
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
More session replication, failover, fixes etc


Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -69,7 +69,7 @@
    
    List<Binding> getBindingsForAddress(SimpleString address) throws Exception;
    
-   Binding getBinding(SimpleString queueName) throws Exception;
+   Binding getBinding(SimpleString queueName);
       
    List<MessageReference> route(ServerMessage message) throws Exception;
    

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -11,6 +11,8 @@
  */
 package org.jboss.messaging.core.remoting;
 
+import java.util.Queue;
+
 import org.jboss.messaging.core.exception.MessagingException;
 
 /**
@@ -49,4 +51,24 @@
    void unlock();
 
    void interruptBlocking();
+   
+   //debug only
+   Queue<Command> getSentCommands();
+   
+   Queue<Command> getReceivedCommands();
+   
+   // For debug only
+   static class Command
+   {
+      public final int commandID;
+
+      public final Packet packet;
+
+      public Command(final int commandID, final Packet packet)
+      {
+         this.commandID = commandID;
+
+         this.packet = packet;
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -100,9 +100,6 @@
                                                                     callTimeout,
                                                                     pingInterval,                                                             
                                                                     pingExecutor,
-                                                                    null,
-                                                                    null,
-                                                                    true,
                                                                     null);        
 
          handler.conn = connection;
@@ -143,9 +140,6 @@
                                                                  callTimeout,
                                                                  pingInterval,                                                            
                                                                  pingExecutor,
-                                                                 null,
-                                                                 null,
-                                                                 true,
                                                                  null);        
 
       handler.conn = connection;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -34,7 +34,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUECOPY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -69,6 +68,9 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
 
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -196,6 +198,8 @@
 
    private volatile boolean active;
 
+   private final boolean client;
+
    private final long pingPeriod;
 
    private final ScheduledExecutorService pingExecutor;
@@ -210,27 +214,204 @@
    private final ReadWriteLock readWriteLock;
 
    private final Object transferLock = new Object();
-   
-   private static volatile boolean debug;
-   
+
+   // debug only stuff
+
+   private boolean createdActive;
+
+   public static volatile boolean debug = false;
+
+   private static final int MAX_DEBUG_CACHE_SIZE = 1000;
+
+   private static final Map<Long, Channel> serverLiveChannels = new LinkedHashMap<Long, Channel>();
+
+   private static final Map<Long, Channel> serverBackupChannels = new LinkedHashMap<Long, Channel>();
+
+   private static final Map<Long, Channel> clientChannels = new LinkedHashMap<Long, Channel>();
+
    private static void report()
    {
+      log.info("=================================================================");
+      log.info("Report on channel history");
+      log.info("-------------------------");
+      log.info(" ");
+      log.info("Server--->Client traffic");
+      log.info("------------------------");
+      for (Channel serverBackupChannel : serverBackupChannels.values())
+      {
+         Channel serverLiveChannel = serverLiveChannels.get(serverBackupChannel.getID());
+
+         Channel clientChannel = clientChannels.get(serverBackupChannel.getID());
+
+         log.info("Channel id: " + serverBackupChannel.getID());
+         log.info("Backup     Live     Client");
+         Iterator<Channel.Command> backup = serverBackupChannel.getSentCommands().iterator();
+         Iterator<Channel.Command> live = serverLiveChannel == null ? null : serverLiveChannel.getSentCommands()
+                                                                                              .iterator();
+         Iterator<Channel.Command> client = clientChannel == null ? null : clientChannel.getReceivedCommands()
+                                                                                        .iterator();
+
+         while (backup.hasNext() || (live != null && live.hasNext()) || (client != null && client.hasNext()))
+         {
+            String line = "";
+            byte bkpPacketType = -1;
+            byte livePacketType = -1;
+            byte clientPacketType = -1;
+            if (backup.hasNext())
+            {
+               Channel.Command backupCommand = backup.next();
+               line += "id:" + backupCommand.commandID + " type:" + backupCommand.packet.getType() + ", ";
+               bkpPacketType = backupCommand.packet.getType();
+            }
+            else
+            {
+               line += "--------------";
+            }
+            if (live != null && live.hasNext())
+            {
+               Channel.Command liveCommand = live.next();
+               line += "id:" + liveCommand.commandID + " type:" + liveCommand.packet.getType() + ", ";
+               livePacketType = liveCommand.packet.getType();
+            }
+            else
+            {
+               line += "--------------";
+            }
+            if (client != null && client.hasNext())
+            {
+               Channel.Command clientCommand = client.next();
+               line += "id:" + clientCommand.commandID + " type:" + clientCommand.packet.getType() + ", ";
+               clientPacketType = clientCommand.packet.getType();
+            }
+            else
+            {
+               line += "--------------";
+            }
+            log.info(line);
+            if (bkpPacketType != -1 && livePacketType != -1 && clientPacketType != -1 &&
+                     (bkpPacketType != livePacketType || livePacketType != clientPacketType))
+            {
+               log.info("*** ERROR Packets in wrong sequence ***");
+            }
+         }
+      }
       
+      log.info("Client--->Server traffic");
+      log.info("------------------------");
+      for (Channel serverBackupChannel : serverBackupChannels.values())
+      {
+         Channel serverLiveChannel = serverLiveChannels.get(serverBackupChannel.getID());
+
+         Channel clientChannel = clientChannels.get(serverBackupChannel.getID());
+
+         log.info("Channel id: " + serverBackupChannel.getID());
+         log.info("Backup     Live     Client");
+         Iterator<Channel.Command> backup = serverBackupChannel.getReceivedCommands().iterator();
+         Iterator<Channel.Command> live = serverLiveChannel == null ? null : serverLiveChannel.getReceivedCommands()
+                                                                                              .iterator();
+         Iterator<Channel.Command> client = clientChannel == null ? null : clientChannel.getSentCommands()
+                                                                                        .iterator();
+
+         while (backup.hasNext() || (live != null && live.hasNext()) || (client != null && client.hasNext()))
+         {
+            String line = "";
+            byte bkpPacketType = -1;
+            byte livePacketType = -1;
+            byte clientPacketType = -1;
+            if (backup.hasNext())
+            {
+               Channel.Command backupCommand = backup.next();
+               line += "id:" + backupCommand.commandID + " type:" + backupCommand.packet.getType() + ", ";
+               bkpPacketType = backupCommand.packet.getType();
+            }
+            else
+            {
+               line += "--------------";
+            }
+            if (live != null && live.hasNext())
+            {
+               Channel.Command liveCommand = live.next();
+               line += "id:" + liveCommand.commandID + " type:" + liveCommand.packet.getType() + ", ";
+               livePacketType = liveCommand.packet.getType();
+            }
+            else
+            {
+               line += "--------------";
+            }
+            if (client != null && client.hasNext())
+            {
+               Channel.Command clientCommand = client.next();
+               line += "id:" + clientCommand.commandID + " type:" + clientCommand.packet.getType() + ", ";
+               clientPacketType = clientCommand.packet.getType();
+            }
+            else
+            {
+               line += "--------------";
+            }
+            log.info(line);
+            if (bkpPacketType != -1 && livePacketType != -1 && clientPacketType != -1 &&
+                     (bkpPacketType != livePacketType || livePacketType != clientPacketType))
+            {
+               log.info("*** ERROR Packets in wrong sequence ***");
+            }
+         }
+      }
+      
+      log.info("End of report====================================================");
    }
 
+   // end debug only stuff
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
+   /*
+    * Create a client side connection
+    */
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final long blockingCallTimeout,
                                  final long pingPeriod,
                                  final ScheduledExecutorService pingExecutor,
+                                 final List<Interceptor> interceptors)
+   {
+      this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, null, true);
+   }
+
+   /*
+    * Create a server side connection
+    */
+   public RemotingConnectionImpl(final Connection transportConnection,
+                                 final long blockingCallTimeout,
+                                 final long pingPeriod,
+                                 final ScheduledExecutorService pingExecutor,
                                  final List<Interceptor> interceptors,
                                  final RemotingConnection replicatingConnection,
                                  final boolean active,
                                  final ReadWriteLock readWriteLock)
 
    {
+      this(transportConnection,
+           blockingCallTimeout,
+           pingPeriod,
+           pingExecutor,
+           interceptors,
+           replicatingConnection,
+           active,
+           readWriteLock,
+           false);
+   }
+
+   private RemotingConnectionImpl(final Connection transportConnection,
+                                  final long blockingCallTimeout,
+                                  final long pingPeriod,
+                                  final ScheduledExecutorService pingExecutor,
+                                  final List<Interceptor> interceptors,
+                                  final RemotingConnection replicatingConnection,
+                                  final boolean active,
+                                  final ReadWriteLock readWriteLock,
+                                  final boolean client)
+
+   {
       this.transportConnection = transportConnection;
 
       this.blockingCallTimeout = blockingCallTimeout;
@@ -253,6 +434,10 @@
       final ChannelHandler ppHandler = new PingPongHandler();
 
       pingChannel.setHandler(ppHandler);
+
+      this.client = client;
+
+      this.createdActive = active;
    }
 
    public void startPinger()
@@ -290,6 +475,25 @@
          channel = new ChannelImpl(this, channelID, packetConfirmationBatchSize, interruptBlockOnFailure);
 
          channels.put(channelID, channel);
+
+         if (debug)
+         {
+            if (client)
+            {
+               clientChannels.put(channelID, channel);
+            }
+            else
+            {
+               if (createdActive)
+               {
+                  serverLiveChannels.put(channelID, channel);
+               }
+               else
+               {
+                  serverBackupChannels.put(channelID, channel);
+               }
+            }
+         }
       }
 
       return channel;
@@ -407,8 +611,8 @@
 
       final long channelID = packet.getChannelID();
 
-      //FIXME - need to redo global ordering since this won't work with multiple connections
-      //Instead use lastSeq technique
+      // FIXME - need to redo global ordering since this won't work with multiple connections
+      // Instead use lastSeq technique
 
       final boolean useLock = readWriteLock != null;
 
@@ -856,6 +1060,14 @@
 
       private final Queue<DelayedResult> responseActions = new ConcurrentLinkedQueue<DelayedResult>();
 
+      // debug stuff
+
+      private final Queue<Command> receivedCommandsCache;
+
+      private final Queue<Command> sentCommandsCache;
+
+      // end debug stuff
+
       private ChannelImpl(final RemotingConnectionImpl connection,
                           final long id,
                           final int packetConfirmationBatchSize,
@@ -893,6 +1105,19 @@
          }
 
          this.interruptBlockOnFailure = interruptBlockOnFailure;
+
+         if (debug)
+         {
+            this.sentCommandsCache = new LinkedList<Command>();
+
+            this.receivedCommandsCache = new LinkedList<Command>();
+         }
+         else
+         {
+            this.sentCommandsCache = null;
+
+            this.receivedCommandsCache = null;
+         }
       }
 
       public long getID()
@@ -1201,6 +1426,16 @@
          lock.unlock();
       }
 
+      public Queue<Command> getSentCommands()
+      {
+         return this.sentCommandsCache;
+      }
+
+      public Queue<Command> getReceivedCommands()
+      {
+         return this.receivedCommandsCache;
+      }
+
       // we need to do a thorough investigation of how packets confirmed get
       //
       // a) replicated from client through live to backup without dealing with on live
@@ -1208,6 +1443,18 @@
 
       private void handlePacket(final Packet packet)
       {
+         if (debug)
+         {
+            int commandID = packet.isRequiresConfirmations() ? lastReceivedCommandID + 1 : -1;
+
+            receivedCommandsCache.add(new Command(commandID, packet));
+
+            if (receivedCommandsCache.size() == MAX_DEBUG_CACHE_SIZE)
+            {
+               receivedCommandsCache.poll();
+            }
+         }
+
          if (packet.getType() == PACKETS_CONFIRMED)
          {
             if (resendCache != null)
@@ -1294,13 +1541,18 @@
                nextConfirmation += packetConfirmationBatchSize;
 
                confirmed.setChannelID(id);
+               
+               if (debug)
+               {
+                  this.sentCommandsCache.add(new Command(-1, confirmed));
+               }
 
                connection.doWrite(confirmed);
             }
          }
       }
 
-      // private volatile int lastSentID;
+      private volatile int lastSentID;
 
       private void addToCache(final Packet packet)
       {
@@ -1308,12 +1560,22 @@
          {
             resendCache.add(packet);
          }
+
+         if (debug && packet.getType() != PacketImpl.SESS_REPLICATE_DELIVERY)
+         {
+            sentCommandsCache.add(new Command(lastSentID++, packet));
+
+            if (sentCommandsCache.size() == MAX_DEBUG_CACHE_SIZE)
+            {
+               sentCommandsCache.poll();
+            }
+         }
       }
 
       private void clearUpTo(final int lastReceivedCommandID)
       {
          final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
-        
+
          if (numberToClear == -1)
          {
             throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
@@ -1325,8 +1587,8 @@
 
             if (packet == null)
             {
-               throw new IllegalStateException(System.identityHashCode(this) + 
-                                               " Can't find packet to clear: " +
+               report();
+               throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
                                                " last received command id " +
                                                lastReceivedCommandID +
                                                " first stored command id " +
@@ -1334,7 +1596,11 @@
                                                " cache size " +
                                                this.resendCache.size() +
                                                " channel id " +
-                                               id);
+                                               id +
+                                               " client " +
+                                               connection.client +
+                                               " created active " +
+                                               connection.createdActive);
             }
          }
 
@@ -1347,6 +1613,10 @@
          {
             if (packet.getType() == PACKETS_CONFIRMED)
             {
+               if (debug)
+               {
+                  sentCommandsCache.add(new Command(-1, packet));
+               }
                connection.doWrite(packet);
             }
             else if (packet.getType() == REPLICATION_RESPONSE)
@@ -1415,4 +1685,5 @@
          }
       }
    }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -25,6 +25,8 @@
 
 import java.util.List;
 
+import org.jboss.messaging.core.remoting.Packet;
+
 /**
  * 
  * A ServerConsumer
@@ -36,11 +38,13 @@
 {
 	long getID();
 	
+	void handleClose(Packet packet);
+	
 	void close() throws Exception;
 
 	List<MessageReference> cancelRefs() throws Exception;
 	
-	void setStarted(boolean started) throws Exception;
+	void setStarted(boolean started);
 	
 	void receiveCredits(int credits) throws Exception;
 	
@@ -51,4 +55,8 @@
 	void failedOver();
 	
 	void deliverReplicated(final long messageID) throws Exception;
+	
+	void lock();
+	
+	void unlock();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -22,18 +22,35 @@
 
 package org.jboss.messaging.core.server;
 
+import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 
-import javax.transaction.xa.Xid;
-import java.util.List;
-
 /**
  *
  * A ServerSession
@@ -55,87 +72,84 @@
    void removeProducer(ServerProducer producer) throws Exception;
 
    void close() throws Exception;
-
-   void setStarted(boolean started) throws Exception;
-
+   
    void promptDelivery(Queue queue);
 
    void send(ServerMessage msg) throws Exception;
 
    void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
 
-   void acknowledge(final long consumerID, final long messageID) throws Exception;
+   void handleAcknowledge(final SessionAcknowledgeMessage packet);
 
-   void rollback() throws Exception;
+   void handleRollback(Packet packet);
 
-   void commit() throws Exception;
+   void handleCommit(Packet packet);
 
-   SessionXAResponseMessage XACommit(boolean onePhase, Xid xid) throws Exception;
+   void handleXACommit(SessionXACommitMessage packet);
 
-   SessionXAResponseMessage XAEnd(Xid xid, boolean failed) throws Exception;
+   void handleXAEnd(SessionXAEndMessage packet);
 
-   SessionXAResponseMessage XAForget(Xid xid);
+   void handleXAForget(SessionXAForgetMessage packet);
 
-   SessionXAResponseMessage XAJoin(Xid xid) throws Exception;
+   void handleXAJoin(SessionXAJoinMessage packet);
 
-   SessionXAResponseMessage XAPrepare(Xid xid) throws Exception;
+   void handleXAPrepare(SessionXAPrepareMessage packet);
 
-   SessionXAResponseMessage XAResume(Xid xid) throws Exception;
+   void handleXAResume(SessionXAResumeMessage packet);
 
-   SessionXAResponseMessage XARollback(Xid xid) throws Exception;
+   void handleXARollback(SessionXARollbackMessage packet);
 
-   SessionXAResponseMessage XAStart(Xid xid);
+   void handleXAStart(SessionXAStartMessage packet);
 
-   SessionXAResponseMessage XASuspend() throws Exception;
+   void handleXASuspend(Packet packet);
 
-   List<Xid> getInDoubtXids() throws Exception;
+   void handleGetInDoubtXids(Packet packet);
 
-   int getXATimeout();
+   void handleGetXATimeout(Packet packet);
 
-   boolean setXATimeout(int timeoutSeconds);
+   void handleSetXATimeout(SessionXASetTimeoutMessage packet);
 
-   void addDestination(SimpleString address, boolean durable, boolean temporary) throws Exception;
+   void handleAddDestination(SessionAddDestinationMessage packet);
+   
+   void handleStart(Packet packet);
+   
+   void handleStop(Packet packet);
 
-   void removeDestination(SimpleString address, boolean durable) throws Exception;
+   void handleRemoveDestination(SessionRemoveDestinationMessage packet);
 
-   void createQueue(SimpleString address,
-                    SimpleString queueName,
-                    SimpleString filterString,
-                    boolean durable,
-                    boolean temporary) throws Exception;
+   void handleCreateQueue(SessionCreateQueueMessage packet);
   
-   void deleteQueue(SimpleString queueName) throws Exception;
+   void handleDeleteQueue(SessionDeleteQueueMessage packet);
 
-   SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
-                                                       SimpleString filterString,
-                                                       int windowSize,
-                                                       int maxRate,
-                                                       boolean browseOnly) throws Exception;
+   void handleCreateConsumer(SessionCreateConsumerMessage packet);
 
-   SessionCreateProducerResponseMessage createProducer(SimpleString address,
-                                                       int windowSize,
-                                                       int maxRate,
-                                                       boolean autoGroupId) throws Exception;
+   void handleCreateProducer(SessionCreateProducerMessage packet);
 
-   SessionQueueQueryResponseMessage executeQueueQuery(SimpleString queueName) throws Exception;
+   void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
 
-   SessionBindingQueryResponseMessage executeBindingQuery(SimpleString address) throws Exception;
+   void handleExecuteBindingQuery(SessionBindingQueryMessage packet);
 
-   void closeConsumer(long consumerID) throws Exception;
+   void handleCloseConsumer(SessionConsumerCloseMessage packet);
 
-   void closeProducer(long producerID) throws Exception;
+   void handleCloseProducer(SessionProducerCloseMessage packet);
 
-   void receiveConsumerCredits(long consumerID, int credits) throws Exception;
+   void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
 
-   void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
+   void handleSendProducerMessage(SessionSendMessage packet);
 
-   void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
+   void handleSendScheduledProducerMessage(SessionScheduledSendMessage packet);
 
+   void handleManagementMessage(SessionSendManagementMessage packet);
+
+   void handleFailedOver(Packet packet);
+   
+   void handleClose(Packet packet);
+   
+   void handleReplicatedDelivery(SessionReplicateDeliveryMessage packet);
+   
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
 
-   void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
+   //Should this really be here??
+   void sendResponse(final DelayedResult result, final Packet response);
 
-   void failedOver() throws Exception;
-   
-   void handleReplicatedDelivery(long consumerID, long messageID) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -21,13 +21,12 @@
  */
 package org.jboss.messaging.core.server.impl;
 
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.ServerMessage;
-
 import java.util.ArrayList;
 import java.util.List;
 
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -483,7 +483,7 @@
       // to the backup, so we need to work in both cases
       if (sessions.putIfAbsent(name, session) == null)
       {
-         ChannelHandler handler = new ServerSessionPacketHandler(session, channel, storageManager);
+         ChannelHandler handler = new ServerSessionPacketHandler(session, channel);
 
          channel.setHandler(handler);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -203,7 +203,7 @@
    }
 
    
-   public synchronized void addConsumer(final Consumer consumer)
+   public void addConsumer(final Consumer consumer)
    {
       distributionPolicy.addConsumer(consumer);
    }
@@ -212,11 +212,6 @@
    {
       boolean removed = distributionPolicy.removeConsumer(consumer);
 
-      if (removed)
-      {
-         distributionPolicy.removeConsumer(consumer);
-      }
-
       if (!distributionPolicy.hasConsumers())
       {
          promptDelivery = false;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -39,19 +39,24 @@
 
    protected int pos = 0;
 
-
-   public synchronized void addConsumer(Consumer consumer)
+   @Override
+   public synchronized void addConsumer(final Consumer consumer)
    {
       pos = 0;
       super.addConsumer(consumer);
    }
 
-   public synchronized boolean removeConsumer(Consumer consumer)
+   @Override
+   public synchronized boolean removeConsumer(final Consumer consumer)
    {
-
       pos = 0;
       return super.removeConsumer(consumer);
    }
+   
+   public synchronized int getConsumerCount()
+   {
+      return super.getConsumerCount();
+   }
 
    public HandleStatus distribute(final MessageReference reference)
    {
@@ -90,7 +95,7 @@
       }
    }
 
-   protected Consumer getNextConsumer()
+   protected synchronized Consumer getNextConsumer()
    {
       Consumer consumer = consumers.get(pos);
       incrementPosition();
@@ -106,7 +111,7 @@
       }
    }
 
-   protected HandleStatus handle(MessageReference reference, Consumer consumer)
+   protected HandleStatus handle(final MessageReference reference, final Consumer consumer)
    {
       HandleStatus status;
       try

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -22,12 +22,23 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -39,11 +50,6 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Concrete implementation of a ClientConsumer.
  *
@@ -75,7 +81,7 @@
 
    private final ServerSession session;
 
-   private final Object startStopLock = new Object();
+   private final Lock lock = new ReentrantLock();
 
    private final AtomicInteger availableCredits;
 
@@ -84,7 +90,7 @@
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
-   private boolean browseOnly;
+   private final boolean browseOnly;
 
    private final StorageManager storageManager;
 
@@ -168,9 +174,12 @@
 
          return HandleStatus.HANDLED;
       }
-
-      synchronized (startStopLock)
+   
+      lock.lock();
+      
+      try
       {
+      
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the
          // queue for delivery later.
@@ -217,36 +226,73 @@
 
          return HandleStatus.HANDLED;
       }
+      finally
+      {
+         lock.unlock();
+      }
    }
-
-   public void close() throws Exception
+   
+   public void handleClose(final Packet packet)
    {
-      browseOnly = false;
+      DelayedResult result = null;
 
-      setStarted(false);
-
-      messageQueue.removeConsumer(this);
-
-      session.removeConsumer(this);
-
-      LinkedList<MessageReference> refs = cancelRefs();
-
-      Iterator<MessageReference> iter = refs.iterator();
-
-      while (iter.hasNext())
+      Packet response = null;
+                  
+      try
+      {               
+         lock.lock();
+         try
+         {
+            setStarted(false);
+         }
+         finally
+         {
+            lock.unlock();
+         }
+         
+         //We must stop delivery before replicating the packet, this ensures the close message gets processed
+         //and replicated on the backup in the same order as any delivery that might be occuring gets
+         //processed and replicated on the backup.
+         //Otherwise we could end up with a situation where a close comes in, then a delivery comes in,
+         //then close gets replicated to backup, then delivery gets replicated, but consumer is already
+         //closed!
+         
+         result = channel.replicatePacket(packet);
+         
+         doClose();
+         
+         response = new NullResponseMessage();
+      }
+      catch (Exception e)
       {
-         MessageReference ref = iter.next();
-
-         if (!ref.cancel(storageManager, postOffice, queueSettingsRepository))
+         log.error("Failed to close producer", e);
+         
+         if (e instanceof MessagingException)
          {
-            iter.remove();
+            response = new MessagingExceptionMessage((MessagingException)e);
          }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
-
-      if (!refs.isEmpty())
+   
+      session.sendResponse(result, response);
+   }
+     
+   public void close() throws Exception
+   {
+      lock.lock();
+      try
       {
-         messageQueue.addListFirst(refs);
+         setStarted(false);
       }
+      finally
+      {
+         lock.unlock();
+      }
+
+      doClose();
    }
 
    public LinkedList<MessageReference> cancelRefs() throws Exception
@@ -268,11 +314,8 @@
 
    public void setStarted(final boolean started)
    {
-      synchronized (startStopLock)
-      {
-         this.started = browseOnly || started;
-      }
-
+      this.started = browseOnly || started;
+      
       // Outside the lock
       if (started)
       {
@@ -355,6 +398,16 @@
          }
       }
    }
+   
+   public void lock()
+   {
+      lock.lock();
+   }
+   
+   public void unlock()
+   {
+      lock.unlock();
+   }
 
    // Public
    // -----------------------------------------------------------------------------
@@ -362,6 +415,32 @@
    // Private
    // --------------------------------------------------------------------------------------
 
+   private void doClose() throws Exception
+   {
+      messageQueue.removeConsumer(this);
+
+      session.removeConsumer(this);
+
+      LinkedList<MessageReference> refs = cancelRefs();
+
+      Iterator<MessageReference> iter = refs.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+
+         if (!ref.cancel(storageManager, postOffice, queueSettingsRepository))
+         {
+            iter.remove();
+         }
+      }
+
+      if (!refs.isEmpty())
+      {
+         messageQueue.addListFirst(refs);
+      }
+   }
+   
    private void promptDelivery()
    {
       session.promptDelivery(messageQueue);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -41,15 +41,46 @@
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.DelayedResult;
 import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.MessageReference;
@@ -68,7 +99,6 @@
 import org.jboss.messaging.util.SimpleIDGenerator;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.SimpleStringIdGenerator;
-import org.jboss.messaging.util.UUIDGenerator;
 
 /*
  * Session implementation 
@@ -235,31 +265,9 @@
       }
    }
 
-   public void setStarted(final boolean s) throws Exception
-   {
-      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
-      for (ServerConsumer consumer : consumersClone)
-      {
-         consumer.setStarted(s);
-      }
-
-      started = s;
-   }
-
-   public void failedOver() throws Exception
-   {
-      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
-      for (ServerConsumer consumer : consumersClone)
-      {
-         consumer.failedOver();
-      }
-   }
-
    public void close() throws Exception
    {
-      rollback(false);
+      rollback();
 
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
 
@@ -349,723 +357,1506 @@
       }
    }
 
-   public void acknowledge(final long consumerID, final long messageID) throws Exception
+   public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
    {
-      MessageReference ref = consumers.get(consumerID).getReference(messageID);
-      
-      //Null implies a browser
-      if (ref != null)
+      SimpleString queueName = packet.getQueueName();
+
+      SimpleString filterString = packet.getFilterString();
+
+      int windowSize = packet.getWindowSize();
+
+      int maxRate = packet.getMaxRate();
+
+      boolean browseOnly = packet.isBrowseOnly();
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
       {
-         if (autoCommitAcks)
+         Binding binding = postOffice.getBinding(queueName);
+
+         if (binding == null)
          {
-            doAck(ref);
+            throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
          }
+
+         securityStore.check(binding.getAddress(), CheckType.READ, this);
+
+         Filter filter = null;
+
+         if (filterString != null)
+         {
+            filter = new FilterImpl(filterString);
+         }
+
+         // Flow control values if specified on queue override those passed in from
+         // client
+
+         QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
+
+         Integer queueWindowSize = qs.getConsumerWindowSize();
+
+         windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+
+         Integer queueMaxRate = queueSettingsRepository.getMatch(queueName.toString()).getConsumerMaxRate();
+
+         maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
+
+         Queue theQueue;
+         if (browseOnly)
+         {
+            // We consume a copy of the queue - TODO - this is a temporary measure
+            // and will disappear once we can provide a proper iterator on the queue
+
+            theQueue = new QueueImpl(-1, queueName, filter, false, false, false, null, postOffice);
+
+            // There's no need for any special locking since the list method is synchronized
+            List<MessageReference> refs = binding.getQueue().list(filter);
+
+            for (MessageReference ref : refs)
+            {
+               theQueue.addLast(ref);
+            }
+         }
          else
          {
-            tx.addAcknowledgement(ref);
-   
-            // Del count is not actually updated in storage unless it's
-            // cancelled
-            ref.incrementDeliveryCount();
+            theQueue = binding.getQueue();
          }
+
+         ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
+                                                          this,
+                                                          theQueue,
+                                                          filter,
+                                                          windowSize != -1,
+                                                          maxRate,
+                                                          started,
+                                                          browseOnly,
+                                                          storageManager,
+                                                          queueSettingsRepository,
+                                                          postOffice,
+                                                          channel);
+
+         response = new SessionCreateConsumerResponseMessage(windowSize);
+
+         consumers.put(consumer.getID(), consumer);
       }
+      catch (Exception e)
+      {
+         log.error("Failed to create consumer", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
+
+      sendResponse(result, response);
    }
 
-   public void rollback() throws Exception
+   public void handleCreateQueue(final SessionCreateQueueMessage packet)
    {
-      rollback(true);
+      SimpleString address = packet.getAddress();
+
+      SimpleString queueName = packet.getQueueName();
+
+      SimpleString filterString = packet.getFilterString();
+
+      boolean temporary = packet.isTemporary();
+
+      boolean durable = packet.isDurable();
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
+      {
+         // make sure the user has privileges to create this queue
+         if (!postOffice.containsDestination(address))
+         {
+            securityStore.check(address, CheckType.CREATE, this);
+         }
+
+         Binding binding = postOffice.getBinding(queueName);
+
+         if (binding != null)
+         {
+            throw new MessagingException(MessagingException.QUEUE_EXISTS);
+         }
+
+         Filter filter = null;
+
+         if (filterString != null)
+         {
+            filter = new FilterImpl(filterString);
+         }
+
+         binding = postOffice.addBinding(address, queueName, filter, durable, temporary);
+
+         if (temporary)
+         {
+            // Temporary queue in core simply means the queue will be deleted if
+            // the remoting connection
+            // dies. It does not mean it will get deleted automatically when the
+            // session is closed.
+            // It is up to the user to delete the queue when finished with it
+
+            final Queue queue = binding.getQueue();
+
+            failureRunners.add(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     postOffice.removeBinding(queue.getName());
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to remove temporary queue " + queue.getName());
+                  }
+               }
+            });
+         }
+
+         response = new NullResponseMessage();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to create queue", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
+
+      sendResponse(result, response);
    }
 
-   private void doRollback(final Transaction theTx) throws Exception
+   public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
    {
-      boolean wasStarted = started;
+      SimpleString queueName = packet.getQueueName();
 
-      List<MessageReference> toCancel = new ArrayList<MessageReference>();
+      DelayedResult result = channel.replicatePacket(packet);
 
-      for (ServerConsumer consumer : consumers.values())
+      Packet response = null;
+
+      try
       {
-         if (wasStarted)
+         Binding binding = postOffice.removeBinding(queueName);
+
+         if (binding == null)
          {
-            consumer.setStarted(false);
+            throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
          }
 
-         toCancel.addAll(consumer.cancelRefs());
+         Queue queue = binding.getQueue();
+
+         if (queue.getConsumerCount() != 0)
+         {
+            throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
+         }
+
+         if (queue.isDurable())
+         {
+            binding.getQueue().deleteAllReferences(storageManager);
+         }
+
+         response = new NullResponseMessage();
       }
+      catch (Exception e)
+      {
+         log.error("Failed to delete consumer", e);
 
-      List<MessageReference> rolledBack = theTx.rollback(queueSettingsRepository);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
 
-      rolledBack.addAll(toCancel);
+      sendResponse(result, response);
+   }
 
-      if (wasStarted)
+   public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
+   {
+      SimpleString queueName = packet.getQueueName();
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
       {
-         for (ServerConsumer consumer : consumers.values())
+         if (queueName == null)
          {
-            consumer.setStarted(true);
+            throw new IllegalArgumentException("Queue name is null");
          }
+
+         Binding binding = postOffice.getBinding(queueName);
+
+         if (binding != null)
+         {
+            Queue queue = binding.getQueue();
+
+            Filter filter = queue.getFilter();
+
+            SimpleString filterString = filter == null ? null : filter.getFilterString();
+            // TODO: Remove MAX-SIZE-BYTES from SessionQueueQueryResponse.
+            response = new SessionQueueQueryResponseMessage(queue.isDurable(),
+                                                            queue.getConsumerCount(),
+                                                            queue.getMessageCount(),
+                                                            filterString,
+                                                            binding.getAddress());
+         }
+         else
+         {
+            response = new SessionQueueQueryResponseMessage();
+         }
       }
+      catch (Exception e)
+      {
+         log.error("Failed to execute queue query", e);
 
-      // Now cancel the refs back to the queue(s), we sort into queues and cancel back atomically to
-      // preserve order
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
 
-      Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+      sendResponse(result, response);
+   }
 
-      for (MessageReference ref : rolledBack)
+   public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
+   {
+      SimpleString address = packet.getAddress();
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
       {
-         if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+         if (address == null)
          {
-            Queue queue = ref.getQueue();
+            throw new IllegalArgumentException("Address is null");
+         }
 
-            LinkedList<MessageReference> list = queueMap.get(queue);
+         boolean exists = postOffice.containsDestination(address);
 
-            if (list == null)
+         List<SimpleString> queueNames = new ArrayList<SimpleString>();
+
+         if (exists)
+         {
+            List<Binding> bindings = postOffice.getBindingsForAddress(address);
+
+            for (Binding binding : bindings)
             {
-               list = new LinkedList<MessageReference>();
-
-               queueMap.put(queue, list);
+               queueNames.add(binding.getQueue().getName());
             }
+         }
 
-            list.add(ref);
+         response = new SessionBindingQueryResponseMessage(exists, queueNames);
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to execute binding query", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
          }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
+      sendResponse(result, response);
+   }
+
+   /**
+    * Create a producer for the specified address
+    *
+    * @param address The address to produce too
+    * @param windowSize - the producer window size to use for flow control. Specify -1 to disable flow control
+    *           completely The actual window size used may be less than the specified window size if it is overridden by
+    *           any producer-window-size specified on the queue
+    */
+   public void handleCreateProducer(final SessionCreateProducerMessage packet)
+   {
+      SimpleString address = packet.getAddress();
+
+      int maxRate = packet.getMaxRate();
+
+      int windowSize = packet.getWindowSize();
+
+      boolean autoGroupID = packet.isAutoGroupId();
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
       {
-         LinkedList<MessageReference> refs = entry.getValue();
+         FlowController flowController = null;
 
-         entry.getKey().addListFirst(refs);
+         final int maxRateToUse = maxRate;
+
+         if (address != null)
+         {
+            flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+         }
+
+         final int windowToUse = flowController == null ? -1 : windowSize;
+
+         // Server window size is 0.75 client window size for producer flow control
+         // (other way round to consumer flow control)
+
+         final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
+
+         ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
+                                                              this,
+                                                              address,
+                                                              flowController,
+                                                              serverWindowSize,
+                                                              channel);
+
+         producers.put(producer.getID(), producer);
+
+         // Get some initial credits to send to the producer - we try for
+         // windowToUse
+
+         int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
+
+         SimpleString groupId = null;
+
+         if (autoGroupID)
+         {
+            groupId = simpleStringIdGenerator.generateID();
+         }
+
+         response = new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
       }
+      catch (Exception e)
+      {
+         log.error("Failed to create producer", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
+
+      sendResponse(result, response);
    }
 
-   private void rollback(final boolean sendResponse) throws Exception
+   public void handleAcknowledge(final SessionAcknowledgeMessage packet)
    {
-      if (tx == null)
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
       {
-         // Might be null if XA
+         MessageReference ref = consumers.get(packet.getConsumerID()).getReference(packet.getMessageID());
 
-         tx = new TransactionImpl(storageManager, postOffice);
+         // Null implies a browser
+         if (ref != null)
+         {
+            if (autoCommitAcks)
+            {
+               doAck(ref);
+            }
+            else
+            {
+               tx.addAcknowledgement(ref);
+
+               // Del count is not actually updated in storage unless it's
+               // cancelled
+               ref.incrementDeliveryCount();
+            }
+         }
+
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
       }
+      catch (Exception e)
+      {
+         log.error("Failed to acknowledge", e);
 
-      doRollback(tx);
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
+      }
 
-      tx = new TransactionImpl(storageManager, postOffice);
+      sendResponse(result, response);
    }
 
-   public void commit() throws Exception
+   public void handleCommit(final Packet packet)
    {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
       try
       {
          tx.commit();
+
+         response = new NullResponseMessage();
       }
+      catch (Exception e)
+      {
+         log.error("Failed to commit", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
       finally
       {
          tx = new TransactionImpl(storageManager, postOffice);
       }
+
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XACommit(final boolean onePhase, final Xid xid) throws Exception
+   public void handleRollback(final Packet packet)
    {
-      if (tx != null)
-      {
-         final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
+      DelayedResult result = channel.replicatePacket(packet);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-      }
+      Packet response = null;
 
-      Transaction theTx = resourceManager.removeTransaction(xid);
-
-      if (theTx == null)
+      try
       {
-         final String msg = "Cannot find xid in resource manager: " + xid;
+         rollback();
 
-         return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+         response = new NullResponseMessage();
       }
-
-      if (theTx.getState() == Transaction.State.SUSPENDED)
+      catch (Exception e)
       {
-         // Put it back
-         resourceManager.putTransaction(xid, tx);
+         log.error("Failed to rollback", e);
 
-         return new SessionXAResponseMessage(true,
-                                             XAException.XAER_PROTO,
-                                             "Cannot commit transaction, it is suspended " + xid);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      theTx.commit();
-
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
+   public void handleXACommit(final SessionXACommitMessage packet)
    {
-      if (tx != null && tx.getXid().equals(xid))
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      Xid xid = packet.getXid();
+
+      try
       {
-         if (tx.getState() == Transaction.State.SUSPENDED)
+         if (tx != null)
          {
-            final String msg = "Cannot end, transaction is suspended";
+            final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
 
-            return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
          }
+         else
+         {
+            Transaction theTx = resourceManager.removeTransaction(xid);
 
-         tx = null;
+            if (theTx == null)
+            {
+               final String msg = "Cannot find xid in resource manager: " + xid;
+
+               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            }
+            else
+            {
+               if (theTx.getState() == Transaction.State.SUSPENDED)
+               {
+                  // Put it back
+                  resourceManager.putTransaction(xid, tx);
+
+                  response = new SessionXAResponseMessage(true,
+                                                          XAException.XAER_PROTO,
+                                                          "Cannot commit transaction, it is suspended " + xid);
+               }
+               else
+               {
+                  theTx.commit();
+
+                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+               }
+            }
+         }
       }
-      else
+      catch (Exception e)
       {
-         // It's also legal for the TM to call end for a Xid in the suspended
-         // state
-         // See JTA 1.1 spec 3.4.4 - state diagram
-         // Although in practice TMs rarely do this.
-         Transaction theTx = resourceManager.getTransaction(xid);
+         log.error("Failed to xa commit", e);
 
-         if (theTx == null)
+         if (e instanceof MessagingException)
          {
-            final String msg = "Cannot find suspended transaction to end " + xid;
-
-            return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            response = new MessagingExceptionMessage((MessagingException)e);
          }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
 
-         if (theTx.getState() != Transaction.State.SUSPENDED)
+      sendResponse(result, response);
+   }
+
+   public void handleXAEnd(final SessionXAEndMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      Xid xid = packet.getXid();
+
+      try
+      {
+         if (tx != null && tx.getXid().equals(xid))
          {
-            final String msg = "Transaction is not suspended " + xid;
+            if (tx.getState() == Transaction.State.SUSPENDED)
+            {
+               final String msg = "Cannot end, transaction is suspended";
 
-            return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+               response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            }
+            else
+            {
+               tx = null;
+            }
          }
+         else
+         {
+            // It's also legal for the TM to call end for a Xid in the suspended
+            // state
+            // See JTA 1.1 spec 3.4.4 - state diagram
+            // Although in practice TMs rarely do this.
+            Transaction theTx = resourceManager.getTransaction(xid);
 
-         theTx.resume();
+            if (theTx == null)
+            {
+               final String msg = "Cannot find suspended transaction to end " + xid;
+
+               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            }
+            else
+            {
+               if (theTx.getState() != Transaction.State.SUSPENDED)
+               {
+                  final String msg = "Transaction is not suspended " + xid;
+
+                  response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+               }
+               else
+               {
+                  theTx.resume();
+               }
+            }
+         }
+
+         if (response == null)
+         {
+            response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+         }
       }
+      catch (Exception e)
+      {
+         log.error("Failed to xa end", e);
 
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
+
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XAForget(final Xid xid)
+   public void handleXAForget(final SessionXAForgetMessage packet)
    {
+      DelayedResult result = channel.replicatePacket(packet);
+
       // Do nothing since we don't support heuristic commits / rollback from the
       // resource manager
 
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XAJoin(final Xid xid) throws Exception
+   public void handleXAJoin(final SessionXAJoinMessage packet)
    {
-      Transaction theTx = resourceManager.getTransaction(xid);
+      DelayedResult result = channel.replicatePacket(packet);
 
-      if (theTx == null)
+      Packet response = null;
+
+      Xid xid = packet.getXid();
+
+      try
       {
-         final String msg = "Cannot find xid in resource manager: " + xid;
+         Transaction theTx = resourceManager.getTransaction(xid);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+         if (theTx == null)
+         {
+            final String msg = "Cannot find xid in resource manager: " + xid;
+
+            response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+         }
+         else
+         {
+            if (theTx.getState() == Transaction.State.SUSPENDED)
+            {
+               response = new SessionXAResponseMessage(true,
+                                                       XAException.XAER_PROTO,
+                                                       "Cannot join tx, it is suspended " + xid);
+            }
+            else
+            {
+               tx = theTx;
+
+               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+            }
+         }
       }
+      catch (Exception e)
+      {
+         log.error("Failed to xa join", e);
 
-      if (theTx.getState() == Transaction.State.SUSPENDED)
-      {
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      tx = theTx;
-
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XAPrepare(final Xid xid) throws Exception
+   public void handleXAResume(final SessionXAResumeMessage packet)
    {
-      if (tx != null)
-      {
-         final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
+      DelayedResult result = channel.replicatePacket(packet);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-      }
+      Packet response = null;
 
-      Transaction theTx = resourceManager.getTransaction(xid);
+      Xid xid = packet.getXid();
 
-      if (theTx == null)
+      try
       {
-         final String msg = "Cannot find xid in resource manager: " + xid;
+         if (tx != null)
+         {
+            final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
 
-         return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
-      }
+            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         }
+         else
+         {
+            Transaction theTx = resourceManager.getTransaction(xid);
 
-      if (theTx.getState() == Transaction.State.SUSPENDED)
-      {
-         return new SessionXAResponseMessage(true,
-                                             XAException.XAER_PROTO,
-                                             "Cannot prepare transaction, it is suspended " + xid);
-      }
+            if (theTx == null)
+            {
+               final String msg = "Cannot find xid in resource manager: " + xid;
 
-      if (theTx.isEmpty())
-      {
-         return new SessionXAResponseMessage(false, XAResource.XA_RDONLY, null);
+               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            }
+            else
+            {
+               if (theTx.getState() != Transaction.State.SUSPENDED)
+               {
+                  response = new SessionXAResponseMessage(true,
+                                                          XAException.XAER_PROTO,
+                                                          "Cannot resume transaction, it is not suspended " + xid);
+               }
+               else
+               {
+                  tx = theTx;
+
+                  tx.resume();
+
+                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+               }
+            }
+         }
       }
-      else
+      catch (Exception e)
       {
-         theTx.prepare();
+         log.error("Failed to xa resume", e);
 
-         return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
+
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XAResume(final Xid xid) throws Exception
+   public void handleXARollback(final SessionXARollbackMessage packet)
    {
-      if (tx != null)
-      {
-         final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
+      DelayedResult result = channel.replicatePacket(packet);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-      }
+      Packet response = null;
 
-      Transaction theTx = resourceManager.getTransaction(xid);
+      Xid xid = packet.getXid();
 
-      if (theTx == null)
+      try
       {
-         final String msg = "Cannot find xid in resource manager: " + xid;
+         if (tx != null)
+         {
+            final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
 
-         return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         }
+         else
+         {
+            Transaction theTx = resourceManager.removeTransaction(xid);
+
+            if (theTx == null)
+            {
+               final String msg = "Cannot find xid in resource manager: " + xid;
+
+               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            }
+            else
+            {
+               if (theTx.getState() == Transaction.State.SUSPENDED)
+               {
+                  // Put it back
+                  resourceManager.putTransaction(xid, tx);
+
+                  response = new SessionXAResponseMessage(true,
+                                                          XAException.XAER_PROTO,
+                                                          "Cannot rollback transaction, it is suspended " + xid);
+               }
+               else
+               {
+                  doRollback(theTx);
+
+                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+               }
+            }
+         }
       }
+      catch (Exception e)
+      {
+         log.error("Failed to xa rollback", e);
 
-      if (theTx.getState() != Transaction.State.SUSPENDED)
-      {
-         return new SessionXAResponseMessage(true,
-                                             XAException.XAER_PROTO,
-                                             "Cannot resume transaction, it is not suspended " + xid);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      tx = theTx;
-
-      tx.resume();
-
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XARollback(final Xid xid) throws Exception
+   public void handleXAStart(final SessionXAStartMessage packet)
    {
-      if (tx != null)
-      {
-         final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
+      DelayedResult result = channel.replicatePacket(packet);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-      }
+      Packet response = null;
 
-      Transaction theTx = resourceManager.removeTransaction(xid);
+      Xid xid = packet.getXid();
 
-      if (theTx == null)
+      try
       {
-         final String msg = "Cannot find xid in resource manager: " + xid;
+         if (tx != null)
+         {
+            final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
 
-         return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         }
+         else
+         {
+            tx = new TransactionImpl(xid, storageManager, postOffice);
+
+            boolean added = resourceManager.putTransaction(xid, tx);
+
+            if (!added)
+            {
+               final String msg = "Cannot start, there is already a xid " + tx.getXid();
+
+               response = new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
+            }
+            else
+            {
+               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+            }
+         }
       }
-
-      if (theTx.getState() == Transaction.State.SUSPENDED)
+      catch (Exception e)
       {
-         // Put it back
-         resourceManager.putTransaction(xid, tx);
+         log.error("Failed to xa start", e);
 
-         return new SessionXAResponseMessage(true,
-                                             XAException.XAER_PROTO,
-                                             "Cannot rollback transaction, it is suspended " + xid);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      doRollback(theTx);
-
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XAStart(final Xid xid)
+   public void handleXASuspend(final Packet packet)
    {
-      if (tx != null)
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      try
       {
-         final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
+         if (tx == null)
+         {
+            final String msg = "Cannot suspend, session is not doing work in a transaction ";
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-      }
+            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         }
+         else
+         {
+            if (tx.getState() == Transaction.State.SUSPENDED)
+            {
+               final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
 
-      tx = new TransactionImpl(xid, storageManager, postOffice);
+               response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            }
+            else
+            {
+               tx.suspend();
 
-      boolean added = resourceManager.putTransaction(xid, tx);
+               tx = null;
 
-      if (!added)
+               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+            }
+         }
+      }
+      catch (Exception e)
       {
-         final String msg = "Cannot start, there is already a xid " + tx.getXid();
+         log.error("Failed to xa suspend", e);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      sendResponse(result, response);
    }
 
-   public SessionXAResponseMessage XASuspend() throws Exception
+   public void handleXAPrepare(final SessionXAPrepareMessage packet)
    {
-      if (tx == null)
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = null;
+
+      Xid xid = packet.getXid();
+
+      try
       {
-         final String msg = "Cannot suspend, session is not doing work in a transaction ";
+         if (tx != null)
+         {
+            final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         }
+         else
+         {
+            Transaction theTx = resourceManager.getTransaction(xid);
+
+            if (theTx == null)
+            {
+               final String msg = "Cannot find xid in resource manager: " + xid;
+
+               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+            }
+            else
+            {
+               if (theTx.getState() == Transaction.State.SUSPENDED)
+               {
+                  response = new SessionXAResponseMessage(true,
+                                                          XAException.XAER_PROTO,
+                                                          "Cannot prepare transaction, it is suspended " + xid);
+               }
+               else
+               {
+                  if (theTx.isEmpty())
+                  {
+                     response = new SessionXAResponseMessage(false, XAResource.XA_RDONLY, null);
+                  }
+                  else
+                  {
+                     theTx.prepare();
+
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  }
+               }
+            }
+         }
       }
-
-      if (tx.getState() == Transaction.State.SUSPENDED)
+      catch (Exception e)
       {
-         final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
+         log.error("Failed to xa prepare", e);
 
-         return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
 
-      tx.suspend();
+      sendResponse(result, response);
+   }
 
-      tx = null;
+   public void handleGetInDoubtXids(final Packet packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
 
-      return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
+
+      sendResponse(result, response);
    }
 
-   public List<Xid> getInDoubtXids() throws Exception
+   public void handleGetXATimeout(final Packet packet)
    {
-      return resourceManager.getPreparedTransactions();
+      DelayedResult result = channel.replicatePacket(packet);
+
+      Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
+
+      sendResponse(result, response);
    }
 
-   public int getXATimeout()
+   public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
    {
-      return resourceManager.getTimeoutSeconds();
-   }
+      DelayedResult result = channel.replicatePacket(packet);
 
-   public boolean setXATimeout(final int timeoutSeconds)
-   {
-      return resourceManager.setTimeoutSeconds(timeoutSeconds);
+      Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
+
+      sendResponse(result, response);
    }
 
-   public void addDestination(final SimpleString address, final boolean durable, final boolean temporary) throws Exception
+   public void handleAddDestination(final SessionAddDestinationMessage packet)
    {
-      securityStore.check(address, CheckType.CREATE, this);
+      DelayedResult result = channel.replicatePacket(packet);
 
-      if (!postOffice.addDestination(address, durable))
-      {
-         throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
-      }
+      Packet response = null;
 
-      if (temporary)
+      final SimpleString address = packet.getAddress();
+
+      final boolean durable = packet.isDurable();
+
+      final boolean temporary = packet.isTemporary();
+
+      try
       {
-         // Temporary address in core simply means the address will be deleted
-         // if the remoting connection
-         // dies. It does not mean it will get deleted automatically when the
-         // session is closed.
-         // It is up to the user to delete the address when finished with it
+         securityStore.check(address, CheckType.CREATE, this);
 
-         failureRunners.add(new Runnable()
+         if (!postOffice.addDestination(address, durable))
          {
-            public void run()
+            throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
+         }
+
+         if (temporary)
+         {
+            // Temporary address in core simply means the address will be deleted
+            // if the remoting connection
+            // dies. It does not mean it will get deleted automatically when the
+            // session is closed.
+            // It is up to the user to delete the address when finished with it
+
+            failureRunners.add(new Runnable()
             {
-               try
+               public void run()
                {
-                  postOffice.removeDestination(address, durable);
+                  try
+                  {
+                     postOffice.removeDestination(address, durable);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to remove temporary address " + address);
+                  }
                }
-               catch (Exception e)
-               {
-                  log.error("Failed to remove temporary address " + address);
-               }
-            }
-         });
+            });
+         }
+
+         response = new NullResponseMessage();
       }
+      catch (Exception e)
+      {
+         log.error("Failed to add destination", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
+
+      sendResponse(result, response);
    }
 
-   public void removeDestination(final SimpleString address, final boolean durable) throws Exception
+   public void handleRemoveDestination(final SessionRemoveDestinationMessage packet)
    {
-      securityStore.check(address, CheckType.CREATE, this);
+      DelayedResult result = channel.replicatePacket(packet);
 
-      if (!postOffice.removeDestination(address, durable))
+      Packet response = null;
+
+      final SimpleString address = packet.getAddress();
+
+      final boolean durable = packet.isDurable();
+
+      try
       {
-         throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST, "Address does not exist: " + address);
+         securityStore.check(address, CheckType.CREATE, this);
+
+         if (!postOffice.removeDestination(address, durable))
+         {
+            throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST,
+                                         "Address does not exist: " + address);
+         }
+
+         response = new NullResponseMessage();
       }
+      catch (Exception e)
+      {
+         log.error("Failed to remove destination", e);
+
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
+
+      sendResponse(result, response);
    }
 
-   public void createQueue(final SimpleString address,
-                           final SimpleString queueName,
-                           final SimpleString filterString,
-                           final boolean durable,
-                           final boolean temporary) throws Exception
+   private void lockConsumers()
    {
-      // make sure the user has privileges to create this queue
-      if (!postOffice.containsDestination(address))
+      for (ServerConsumer consumer : consumers.values())
       {
-         securityStore.check(address, CheckType.CREATE, this);
+         consumer.lock();
       }
-      Binding binding = postOffice.getBinding(queueName);
+   }
 
-      if (binding != null)
+   private void unlockConsumers()
+   {
+      for (ServerConsumer consumer : consumers.values())
       {
-         throw new MessagingException(MessagingException.QUEUE_EXISTS);
+         consumer.unlock();
       }
+   }
 
-      Filter filter = null;
-
-      if (filterString != null)
+   public void handleStart(final Packet packet)
+   {
+      boolean lock = this.channel.getReplicatingChannel() != null;
+      
+      if (lock)
       {
-         filter = new FilterImpl(filterString);
+         lockConsumers();
       }
 
-      binding = postOffice.addBinding(address, queueName, filter, durable, temporary);
-
-      if (temporary)
+      //We need to prevent any delivery and replication of delivery occurring while the start/stop
+      //is being processed.
+      //Otherwise we can end up with start/stop being processed in different order on backup to live.
+      //Which can result in, say, a delivery arriving at backup, but it's still not started!
+      try
       {
-         // Temporary queue in core simply means the queue will be deleted if
-         // the remoting connection
-         // dies. It does not mean it will get deleted automatically when the
-         // session is closed.
-         // It is up to the user to delete the queue when finished with it
+         channel.replicatePacket(packet);
 
-         final Queue queue = binding.getQueue();
+         // set started will unlock
+         setStarted(true);
 
-         failureRunners.add(new Runnable()
+      }
+      finally
+      {
+         if (lock)
          {
-            public void run()
-            {
-               try
-               {
-                  postOffice.removeBinding(queue.getName());
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to remove temporary queue " + queue.getName());
-               }
-            }
-         });
+            unlockConsumers();
+         }
       }
    }
 
-   public void deleteQueue(final SimpleString queueName) throws Exception
+   public void handleStop(final Packet packet)
    {
-      Binding binding = postOffice.removeBinding(queueName);
-
-      if (binding == null)
+      boolean lock = this.channel.getReplicatingChannel() != null;
+      
+      if (lock)
       {
-         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+         lockConsumers();
       }
 
-      Queue queue = binding.getQueue();
+      try
+      {
+         DelayedResult result = channel.replicatePacket(packet);
 
-      if (queue.getConsumerCount() != 0)
+         setStarted(false);
+
+         sendResponse(result, new NullResponseMessage());
+      }
+      finally
       {
-         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
+         if (lock)
+         {
+            unlockConsumers();
+         }
       }
+   }
 
-      if (queue.isDurable())
+   public void handleFailedOver(final Packet packet)
+   {
+      // No need to replicate since this only occurs after failover
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+      for (ServerConsumer consumer : consumersClone)
       {
-         binding.getQueue().deleteAllReferences(storageManager);
+         consumer.failedOver();
       }
    }
 
-   public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName,
-                                                              final SimpleString filterString,
-                                                              int windowSize,
-                                                              int maxRate,
-                                                              final boolean browseOnly) throws Exception
+   public void handleClose(final Packet packet)
    {
-      Binding binding = postOffice.getBinding(queueName);
+      DelayedResult result = channel.replicatePacket(packet);
 
-      if (binding == null)
+      Packet response = null;
+
+      try
       {
-         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+         close();
+
+         response = new NullResponseMessage();
       }
+      catch (Exception e)
+      {
+         log.error("Failed to close", e);
 
-      securityStore.check(binding.getAddress(), CheckType.READ, this);
+         if (e instanceof MessagingException)
+         {
+            response = new MessagingExceptionMessage((MessagingException)e);
+         }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
+      }
 
-      Filter filter = null;
+      sendResponse(result, response, true);
+   }
 
-      if (filterString != null)
+   private void setStarted(final boolean s)
+   {
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+      for (ServerConsumer consumer : consumersClone)
       {
-         filter = new FilterImpl(filterString);
+         consumer.setStarted(s);
       }
 
-      // Flow control values if specified on queue override those passed in from
-      // client
+      started = s;
+   }
 
-      QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
+   public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+   {
+      consumers.get(packet.getConsumerID()).handleClose(packet);
+   }
 
-      Integer queueWindowSize = qs.getConsumerWindowSize();
+   public void handleCloseProducer(final SessionProducerCloseMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
 
-      windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+      Packet response = null;
 
-      Integer queueMaxRate = queueSettingsRepository.getMatch(queueName.toString()).getConsumerMaxRate();
+      try
+      {
+         producers.get(packet.getProducerID()).close();
 
-      maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
-
-      Queue theQueue;
-      if (browseOnly)
+         response = new NullResponseMessage();
+      }
+      catch (Exception e)
       {
-         // We consume a copy of the queue - TODO - this is a temporary measure
-         // and will disappear once we can provide a proper iterator on the queue
+         log.error("Failed to close producer", e);
 
-         theQueue = new QueueImpl(-1,
-                                  queueName,
-                                  filter,
-                                  false,
-                                  false,
-                                  false,
-                                  null,
-                                  postOffice);
-
-         //There's no need for any special locking since the list method is synchronized
-         List<MessageReference> refs = binding.getQueue().list(filter);
-
-         for (MessageReference ref : refs)
+         if (e instanceof MessagingException)
          {
-            theQueue.addLast(ref);
+            response = new MessagingExceptionMessage((MessagingException)e);
          }
+         else
+         {
+            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+         }
       }
-      else
-      {
-         theQueue = binding.getQueue();
-      }
 
-      ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
-                                                       this,
-                                                       theQueue,
-                                                       filter,
-                                                       windowSize != -1,
-                                                       maxRate,
-                                                       started,
-                                                       browseOnly,
-                                                       storageManager,
-                                                       queueSettingsRepository,
-                                                       postOffice,
-                                                       channel);
+      sendResponse(result, response);
+   }
 
-      SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
+   public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
+   {
+      channel.replicatePacket(packet);
 
-      consumers.put(consumer.getID(), consumer);
-
-      return response;
+      try
+      {
+         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to receive credits", e);
+      }
    }
 
-   public SessionQueueQueryResponseMessage executeQueueQuery(final SimpleString queueName) throws Exception
+   public void handleSendProducerMessage(final SessionSendMessage packet)
    {
-      if (queueName == null)
+      ServerMessage msg = packet.getServerMessage();
+
+      if (msg.getMessageID() == 0L)
       {
-         throw new IllegalArgumentException("Queue name is null");
+         // must generate message id here, so we know they are in sync on live and backup
+         long id = storageManager.generateUniqueID();
+
+         msg.setMessageID(id);
       }
 
-      Binding binding = postOffice.getBinding(queueName);
+      DelayedResult result = channel.replicatePacket(packet);
 
-      SessionQueueQueryResponseMessage response;
+      Packet response = null;
 
-      if (binding != null)
+      try
       {
-         Queue queue = binding.getQueue();
+         producers.get(packet.getProducerID()).send(msg);
 
-         Filter filter = queue.getFilter();
-
-         SimpleString filterString = filter == null ? null : filter.getFilterString();
-         // TODO: Remove MAX-SIZE-BYTES from SessionQueueQueryResponse.
-         response = new SessionQueueQueryResponseMessage(queue.isDurable(),
-                                                         queue.getConsumerCount(),
-                                                         queue.getMessageCount(),
-                                                         filterString,
-                                                         binding.getAddress());
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
       }
-      else
+      catch (Exception e)
       {
-         response = new SessionQueueQueryResponseMessage();
+         log.error("Failed to send message", e);
+
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
       }
 
-      return response;
+      sendResponse(result, response);
    }
 
-   public SessionBindingQueryResponseMessage executeBindingQuery(final SimpleString address) throws Exception
+   public void handleSendScheduledProducerMessage(final SessionScheduledSendMessage packet)
    {
-      if (address == null)
+      ServerMessage msg = packet.getServerMessage();
+
+      if (msg.getMessageID() == 0L)
       {
-         throw new IllegalArgumentException("Address is null");
+         // must generate message id here, so we know they are in sync on live and backup
+         long id = storageManager.generateUniqueID();
+
+         msg.setMessageID(id);
       }
 
-      boolean exists = postOffice.containsDestination(address);
+      DelayedResult result = channel.replicatePacket(packet);
 
-      List<SimpleString> queueNames = new ArrayList<SimpleString>();
+      Packet response = null;
 
-      if (exists)
+      try
       {
-         List<Binding> bindings = postOffice.getBindingsForAddress(address);
+         producers.get(packet.getProducerID()).sendScheduled(msg, packet.getScheduledDeliveryTime());
 
-         for (Binding binding : bindings)
+         if (packet.isRequiresResponse())
          {
-            queueNames.add(binding.getQueue().getName());
+            response = new NullResponseMessage();
          }
       }
+      catch (Exception e)
+      {
+         log.error("Failed to send scheduled message", e);
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
+      }
 
-      return new SessionBindingQueryResponseMessage(exists, queueNames);
+      sendResponse(result, response);
    }
 
-   /**
-    * Create a producer for the specified address
-    *
-    * @param address The address to produce too
-    * @param windowSize - the producer window size to use for flow control. Specify -1 to disable flow control
-    *           completely The actual window size used may be less than the specified window size if it is overridden by
-    *           any producer-window-size specified on the queue
-    */
-   public SessionCreateProducerResponseMessage createProducer(final SimpleString address,
-                                                              final int windowSize,
-                                                              final int maxRate,
-                                                              final boolean autoGroupId) throws Exception
+   public void handleManagementMessage(final SessionSendManagementMessage packet)
    {
-      FlowController flowController = null;
+      DelayedResult result = channel.replicatePacket(packet);
 
-      final int maxRateToUse = maxRate;
+      Packet response = null;
 
-      if (address != null)
+      try
       {
-         flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
-      }
+         ServerMessage serverMessage = packet.getServerMessage();
 
-      final int windowToUse = flowController == null ? -1 : windowSize;
+         if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
+         {
+            boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
 
-      // Server window size is 0.75 client window size for producer flow control
-      // (other way round to consumer flow control)
+            final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
 
-      final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
+            if (subscribe)
+            {
+               if (log.isDebugEnabled())
+               {
+                  log.debug("added notification listener " + this);
+               }
 
-      ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
-                                                           this,
-                                                           address,
-                                                           flowController,
-                                                           serverWindowSize,
-                                                           channel);
+               managementService.addNotificationListener(this, null, replyTo);
+            }
+            else
+            {
+               if (log.isDebugEnabled())
+               {
+                  log.debug("removed notification listener " + this);
+               }
 
-      producers.put(producer.getID(), producer);
+               managementService.removeNotificationListener(this);
+            }
+         }
+         else
+         {
+            managementService.handleMessage(serverMessage);
 
-      // Get some initial credits to send to the producer - we try for
-      // windowToUse
+            serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
 
-      int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
+            send(serverMessage);
+         }
 
-      SimpleString groupId = null;
-      if (autoGroupId)
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
+      }
+      catch (Exception e)
       {
-         groupId = simpleStringIdGenerator.generateID();
+         log.error("Failed to send management message", e);
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
       }
-      return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
-   }
 
-   public void closeConsumer(final long consumerID) throws Exception
-   {
-      consumers.get(consumerID).close();
+      sendResponse(result, response);
    }
 
-   public void closeProducer(final long producerID) throws Exception
+   public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
    {
-      producers.get(producerID).close();
-   }
+      ServerConsumer consumer = consumers.get(packet.getConsumerID());
 
-   public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception
-   {
-      consumers.get(consumerID).receiveCredits(credits);
-   }
+      if (consumer == null)
+      {
+         throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed");
+      }
 
-   public void sendProducerMessage(final long producerID, final ServerMessage message) throws Exception
-   {
-      producers.get(producerID).send(message);
+      try
+      {
+         consumer.deliverReplicated(packet.getMessageID());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle replicated delivery", e);
+      }
    }
 
-   public void sendScheduledProducerMessage(final long producerID,
-                                            final ServerMessage message,
-                                            final long scheduledDeliveryTime) throws Exception
-   {
-      producers.get(producerID).sendScheduled(message, scheduledDeliveryTime);
-   }
-
    public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
    {
       remotingConnection.removeFailureListener(this);
@@ -1088,53 +1879,6 @@
       return serverLastReceivedCommandID;
    }
 
-   public void handleManagementMessage(final SessionSendManagementMessage message) throws Exception
-   {
-      ServerMessage serverMessage = message.getServerMessage();
-
-      if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
-      {
-         boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
-
-         final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
-
-         if (subscribe)
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("added notification listener " + this);
-            }
-
-            managementService.addNotificationListener(this, null, replyTo);
-         }
-         else
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("removed notification listener " + this);
-            }
-
-            managementService.removeNotificationListener(this);
-         }
-         return;
-      }
-      managementService.handleMessage(message.getServerMessage());
-
-      serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-
-      send(serverMessage);
-   }
-
-   public void handleReplicatedDelivery(long consumerID, long messageID) throws Exception
-   {
-      ServerConsumer consumer = consumers.get(consumerID);
-
-      if (consumer != null)
-      {
-         consumer.deliverReplicated(messageID);
-      }
-   }
-
    // FailureListener implementation
    // --------------------------------------------------------------------
 
@@ -1155,6 +1899,8 @@
          }
 
          close();
+
+         channel.close();
       }
       catch (Throwable t)
       {
@@ -1193,6 +1939,80 @@
    // Private
    // ----------------------------------------------------------------------------
 
+   private void doRollback(final Transaction theTx) throws Exception
+   {
+      boolean wasStarted = started;
+
+      List<MessageReference> toCancel = new ArrayList<MessageReference>();
+
+      for (ServerConsumer consumer : consumers.values())
+      {
+         if (wasStarted)
+         {
+            consumer.setStarted(false);
+         }
+
+         toCancel.addAll(consumer.cancelRefs());
+      }
+
+      List<MessageReference> rolledBack = theTx.rollback(queueSettingsRepository);
+
+      rolledBack.addAll(toCancel);
+
+      if (wasStarted)
+      {
+         for (ServerConsumer consumer : consumers.values())
+         {
+            consumer.setStarted(true);
+         }
+      }
+
+      // Now cancel the refs back to the queue(s), we sort into queues and cancel back atomically to
+      // preserve order
+
+      Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+
+      for (MessageReference ref : rolledBack)
+      {
+         if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+         {
+            Queue queue = ref.getQueue();
+
+            LinkedList<MessageReference> list = queueMap.get(queue);
+
+            if (list == null)
+            {
+               list = new LinkedList<MessageReference>();
+
+               queueMap.put(queue, list);
+            }
+
+            list.add(ref);
+         }
+      }
+
+      for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
+      {
+         LinkedList<MessageReference> refs = entry.getValue();
+
+         entry.getKey().addListFirst(refs);
+      }
+   }
+
+   private void rollback() throws Exception
+   {
+      if (tx == null)
+      {
+         // Might be null if XA
+
+         tx = new TransactionImpl(storageManager, postOffice);
+      }
+
+      doRollback(tx);
+
+      tx = new TransactionImpl(storageManager, postOffice);
+   }
+
    private void doAck(final MessageReference ref) throws Exception
    {
       ServerMessage message = ref.getMessage();
@@ -1236,4 +2056,41 @@
          throw e;
       }
    }
+
+   private void sendResponse(final DelayedResult result, final Packet response, final boolean closeChannel)
+   {
+      if (response != null)
+      {
+         if (result == null)
+         {
+            // Not clustered - just send now
+            channel.send(response);
+
+            if (closeChannel)
+            {
+               channel.close();
+            }
+         }
+         else
+         {
+            result.setResultRunner(new Runnable()
+            {
+               public void run()
+               {
+                  channel.send(response);
+
+                  if (closeChannel)
+                  {
+                     channel.close();
+                  }
+               }
+            });
+         }
+      }
+   }
+
+   public void sendResponse(final DelayedResult result, final Packet response)
+   {
+      this.sendResponse(result, response, false);
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -48,19 +48,10 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
 
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.DelayedResult;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -80,16 +71,12 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
 /**
@@ -107,18 +94,13 @@
 
    private final Channel channel;
 
-   private final StorageManager storageManager;
-
    public ServerSessionPacketHandler(final ServerSession session,
-                                     final Channel channel,
-                                     final StorageManager storageManager)
+                                     final Channel channel)
 
    {
       this.session = session;
 
       this.channel = channel;
-
-      this.storageManager = storageManager;
    }
 
    public long getID()
@@ -130,25 +112,6 @@
    {
       byte type = packet.getType();
 
-      if (type == SESS_SEND || type == SESS_SCHEDULED_SEND)
-      {
-         SessionSendMessage send = (SessionSendMessage)packet;
-
-         ServerMessage msg = send.getServerMessage();
-
-         if (msg.getMessageID() == 0L)
-         {
-            // must generate message id here, so we know they are in sync on live and backup
-            long id = storageManager.generateUniqueID();
-
-            send.getServerMessage().setMessageID(id);
-         }
-      }
-
-      Packet response = null;
-
-      DelayedResult result = channel.replicatePacket(packet);
-
       try
       {
          switch (type)
@@ -156,220 +119,184 @@
             case SESS_CREATECONSUMER:
             {
                SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
-               response = session.createConsumer(request.getQueueName(),
-                                                 request.getFilterString(),
-                                                 request.getWindowSize(),
-                                                 request.getMaxRate(),
-                                                 request.isBrowseOnly());
+               session.handleCreateConsumer(request);
                break;
             }
             case SESS_CREATEQUEUE:
             {
                SessionCreateQueueMessage request = (SessionCreateQueueMessage)packet;
-               session.createQueue(request.getAddress(),
-                                   request.getQueueName(),
-                                   request.getFilterString(),
-                                   request.isDurable(),
-                                   request.isTemporary());
-               response = new NullResponseMessage();
+               session.handleCreateQueue(request);
                break;
             }
             case SESS_DELETE_QUEUE:
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
-               session.deleteQueue(request.getQueueName());
-               response = new NullResponseMessage();
+               session.handleDeleteQueue(request);               
                break;
             }
             case SESS_QUEUEQUERY:
             {
                SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
-               response = session.executeQueueQuery(request.getQueueName());
+               session.handleExecuteQueueQuery(request);
                break;
             }
             case SESS_BINDINGQUERY:
             {
                SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
-               response = session.executeBindingQuery(request.getAddress());
+               session.handleExecuteBindingQuery(request);
                break;
             }
             case SESS_CREATEPRODUCER:
             {
                SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
-               response = session.createProducer(request.getAddress(),
-                                                 request.getWindowSize(),
-                                                 request.getMaxRate(),
-                                                 request.isAutoGroupId());
+               session.handleCreateProducer(request);
                break;
             }
             case SESS_ACKNOWLEDGE:
             {
                SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
-               session.acknowledge(message.getConsumerID(), message.getMessageID());
-               if (message.isRequiresResponse())
-               {
-                  response = new NullResponseMessage();
-               }
+               session.handleAcknowledge(message);               
                break;
             }
             case SESS_COMMIT:
             {
-               session.commit();
-               response = new NullResponseMessage();
+               session.handleCommit(packet);               
                break;
             }
             case SESS_ROLLBACK:
             {
-               session.rollback();
-               response = new NullResponseMessage();
+               session.handleRollback(packet);
                break;
             }
             case SESS_XA_COMMIT:
             {
                SessionXACommitMessage message = (SessionXACommitMessage)packet;
-               response = session.XACommit(message.isOnePhase(), message.getXid());
+               session.handleXACommit(message);
                break;
             }
             case SESS_XA_END:
             {
                SessionXAEndMessage message = (SessionXAEndMessage)packet;
-               response = session.XAEnd(message.getXid(), message.isFailed());
+               session.handleXAEnd(message);
                break;
             }
             case SESS_XA_FORGET:
             {
                SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
-               response = session.XAForget(message.getXid());
+               session.handleXAForget(message);
                break;
             }
             case SESS_XA_JOIN:
             {
                SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
-               response = session.XAJoin(message.getXid());
+               session.handleXAJoin(message);
                break;
             }
             case SESS_XA_RESUME:
             {
                SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
-               response = session.XAResume(message.getXid());
+               session.handleXAResume(message);
                break;
             }
             case SESS_XA_ROLLBACK:
             {
                SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
-               response = session.XARollback(message.getXid());
+               session.handleXARollback(message);
                break;
             }
             case SESS_XA_START:
             {
                SessionXAStartMessage message = (SessionXAStartMessage)packet;
-               response = session.XAStart(message.getXid());
+               session.handleXAStart(message);
                break;
             }
             case SESS_XA_SUSPEND:
             {
-               response = session.XASuspend();
+               session.handleXASuspend(packet);
                break;
             }
             case SESS_XA_PREPARE:
             {
                SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
-               response = session.XAPrepare(message.getXid());
+               session.handleXAPrepare(message);
                break;
             }
             case SESS_XA_INDOUBT_XIDS:
             {
-               List<Xid> xids = session.getInDoubtXids();
-               response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+               session.handleGetInDoubtXids(packet);               
                break;
             }
             case SESS_XA_GET_TIMEOUT:
             {
-               response = new SessionXAGetTimeoutResponseMessage(session.getXATimeout());
+               session.handleGetXATimeout(packet);
                break;
             }
             case SESS_XA_SET_TIMEOUT:
             {
                SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
-               response = new SessionXASetTimeoutResponseMessage(session.setXATimeout(message.getTimeoutSeconds()));
+               session.handleSetXATimeout(message);
                break;
             }
             case SESS_ADD_DESTINATION:
             {
                SessionAddDestinationMessage message = (SessionAddDestinationMessage)packet;
-               session.addDestination(message.getAddress(), message.isDurable(), message.isTemporary());
-               response = new NullResponseMessage();
+               session.handleAddDestination(message);
                break;
             }
             case SESS_REMOVE_DESTINATION:
             {
                SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
-               session.removeDestination(message.getAddress(), message.isDurable());
-               response = new NullResponseMessage();
+               session.handleRemoveDestination(message);              
                break;
             }
             case SESS_START:
             {
-               session.setStarted(true);
+               session.handleStart(packet);
                break;
             }
             case SESS_FAILOVER_COMPLETE:
             {
-               session.failedOver();
+               session.handleFailedOver(packet);
                break;
             }
             case SESS_STOP:
             {
-               session.setStarted(false);
-               response = new NullResponseMessage();
+               session.handleStop(packet);
                break;
             }
             case SESS_CLOSE:
             {
-               session.close();
-               response = new NullResponseMessage();
+               session.handleClose(packet);
                break;
             }
             case SESS_CONSUMER_CLOSE:
             {
                SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
-               session.closeConsumer(message.getConsumerID());
-               response = new NullResponseMessage();
+               session.handleCloseConsumer(message);
                break;
             }
             case SESS_PRODUCER_CLOSE:
             {
                SessionProducerCloseMessage message = (SessionProducerCloseMessage)packet;
-               session.closeProducer(message.getProducerID());
-               response = new NullResponseMessage();
+               session.handleCloseProducer(message);
                break;
             }
             case SESS_FLOWTOKEN:
             {
                SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
-               session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
+               session.handleReceiveConsumerCredits(message);
                break;
             }
             case SESS_SEND:
             {
                SessionSendMessage message = (SessionSendMessage)packet;
-               session.sendProducerMessage(message.getProducerID(), message.getServerMessage());
-               if (message.isRequiresResponse())
-               {
-                  response = new NullResponseMessage();
-               }
+               session.handleSendProducerMessage(message);
                break;
             }
             case SESS_SCHEDULED_SEND:
             {
                SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;
-               session.sendScheduledProducerMessage(message.getProducerID(),
-                                                    message.getServerMessage(),
-                                                    message.getScheduledDeliveryTime());
-               if (message.isRequiresResponse())
-               {
-                  response = new NullResponseMessage();
-               }
+               session.handleSendScheduledProducerMessage(message);
                break;
             }
             case SESS_MANAGEMENT_SEND:
@@ -381,67 +308,16 @@
             case SESS_REPLICATE_DELIVERY:
             {
                SessionReplicateDeliveryMessage message = (SessionReplicateDeliveryMessage)packet;
-               session.handleReplicatedDelivery(message.getConsumerID(), message.getMessageID());
+               session.handleReplicatedDelivery(message);
                break;
             }
-            default:
-            {
-               response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
-                                                                               "Unsupported packet " + type));
-            }
          }
       }
       catch (Throwable t)
       {
-         MessagingException me;
-
          log.error("Caught unexpected exception", t);
-
-         if (t instanceof MessagingException)
-         {
-            me = (MessagingException)t;
-         }
-         else
-         {
-            me = new MessagingException(MessagingException.INTERNAL_ERROR);
-         }
-
-         response = new MessagingExceptionMessage(me);
       }
 
-      if (response != null)
-      {
-         final boolean closeChannel = type == SESS_CLOSE;
-
-         if (result == null)
-         {
-            // Not clustered - just send now
-            channel.send(response);              
-            
-            if (closeChannel)
-            {
-               channel.close();
-            }
-         }
-         else
-         {
-            final Packet theResponse = response;
-            
-            result.setResultRunner(new Runnable()
-            {
-               public void run()
-               {
-                  channel.send(theResponse);
-                  
-                  if (closeChannel)
-                  {
-                     channel.close();
-                  }
-               }
-            });
-         }
-      }
-
       channel.replicateComplete();
    }
 }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -3760,7 +3760,7 @@
          {
             conn.close();
          }
-      }
+      }      
    }
 
    // http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
@@ -4115,14 +4115,14 @@
          {
             TextMessage tm = (TextMessage) m;
 
-            log.trace("Got message:" + tm.getText() + " count is " + count);
+            log.info("Got message:" + tm.getText() + " count is " + count);
 
             messageOrder += tm.getText() + " ";
             if (count == 0)
             {
                if (!("a".equals(tm.getText())))
                {
-                  log.trace("Should be a but was " + tm.getText());
+                  log.info("Should be a but was " + tm.getText());
                   failed = true;
                   latch.countDown();
                }
@@ -4130,14 +4130,14 @@
                {
                   sess.rollback();
                   messageOrder += "RB ";
-                  log.trace("rollback() called");
+                  log.info("rollback() called");
                }
                else
                {
-                  log.trace("Calling recover");
+                  log.info("Calling recover");
                   messageOrder += "RC ";
                   sess.recover();
-                  log.trace("recover() called");
+                  log.info("recover() called");
                }
             }
 
@@ -4145,7 +4145,7 @@
             {
                if (!("a".equals(tm.getText())))
                {
-                  log.trace("Should be a but was " + tm.getText());
+                  log.info("Should be a but was " + tm.getText());
                   failed = true;
                   latch.countDown();
                }
@@ -4159,7 +4159,7 @@
             {
                if (!("b".equals(tm.getText())))
                {
-                  log.trace("Should be b but was " + tm.getText());
+                  log.info("Should be b but was " + tm.getText());
                   failed = true;
                   latch.countDown();
                }
@@ -4168,7 +4168,7 @@
             {
                if (!("c".equals(tm.getText())))
                {
-                  log.trace("Should be c but was " + tm.getText());
+                  log.info("Should be c but was " + tm.getText());
                   failed = true;
                   latch.countDown();
                }
@@ -4178,7 +4178,7 @@
                }
                else
                {
-                  log.trace("Acknowledging session");
+                  log.info("Acknowledging session");
                   tm.acknowledge();
                }
                latch.countDown();
@@ -4187,7 +4187,7 @@
          }
          catch (JMSException e)
          {
-            log.trace("Caught JMSException", e);
+            log.info("Caught JMSException", e);
             failed = true;
             latch.countDown();
          }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -98,8 +98,10 @@
       }
       finally
       {
-         if (conn1 != null) conn1.close();
-         if (conn2 != null) conn2.close();
+         if (conn1 != null)
+            conn1.close();
+         if (conn2 != null)
+            conn2.close();
       }
    }
 
@@ -116,7 +118,8 @@
       }
       finally
       {
-         if (conn1 != null) conn1.close();
+         if (conn1 != null)
+            conn1.close();
       }
    }
 
@@ -134,11 +137,12 @@
       }
       catch (JMSSecurityException e)
       {
-         //Expected
+         // Expected
       }
       finally
       {
-         if (conn1 != null) conn1.close();
+         if (conn1 != null)
+            conn1.close();
       }
    }
 
@@ -156,11 +160,12 @@
       }
       catch (JMSSecurityException e)
       {
-         //Expected
+         // Expected
       }
       finally
       {
-         if (conn1 != null) conn1.close();
+         if (conn1 != null)
+            conn1.close();
       }
    }
 
@@ -169,7 +174,7 @@
    /**
     * user/pwd with preconfigured clientID, should return preconf
     */
-    public void testPreConfClientID() throws Exception
+   public void testPreConfClientID() throws Exception
    {
       Connection conn = null;
       try
@@ -177,7 +182,7 @@
          ArrayList<String> bindings = new ArrayList<String>();
          bindings.add("preConfcf");
          deployConnectionFactory("dilbert-id", "preConfcf", bindings);
-         ConnectionFactory cf = (ConnectionFactory) getInitialContext().lookup("preConfcf");
+         ConnectionFactory cf = (ConnectionFactory)getInitialContext().lookup("preConfcf");
          conn = cf.createConnection("dilbert", "dogbert");
          String clientID = conn.getClientID();
          assertEquals("Invalid ClientID", "dilbert-id", clientID);
@@ -205,7 +210,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -220,7 +226,7 @@
          ArrayList<String> bindings = new ArrayList<String>();
          bindings.add("preConfcf");
          deployConnectionFactory("dilbert-id", "preConfcf", bindings);
-         ConnectionFactory cf = (ConnectionFactory) getInitialContext().lookup("preConfcf");
+         ConnectionFactory cf = (ConnectionFactory)getInitialContext().lookup("preConfcf");
          conn = cf.createConnection("dilbert", "dogbert");
          conn.setClientID("myID");
          fail();
@@ -252,11 +258,12 @@
       }
       catch (IllegalStateException e)
       {
-         //Expected
+         // Expected
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -343,7 +350,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -358,7 +366,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -374,11 +383,12 @@
       }
       catch (JMSSecurityException e)
       {
-         //Expected
+         // Expected
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -392,7 +402,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -406,7 +417,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -420,7 +432,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -434,7 +447,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -449,8 +463,8 @@
          ArrayList<String> bindings = new ArrayList<String>();
          bindings.add("preConfcf");
          deployConnectionFactory("dilbert-id", "preConfcf", bindings);
-         ConnectionFactory cf = (ConnectionFactory) getInitialContext().lookup("preConfcf");
-         //setSecurityConfig(oldDefaultConfig);
+         ConnectionFactory cf = (ConnectionFactory)getInitialContext().lookup("preConfcf");
+         // setSecurityConfig(oldDefaultConfig);
          conn = cf.createConnection("dilbert", "dogbert");
          assertTrue(this.canCreateDurableSub(conn, topic1, "sub2"));
       }
@@ -476,11 +490,11 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
-
    public void testDefaultSecurityValid() throws Exception
    {
       Connection conn = null;
@@ -494,7 +508,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -511,7 +526,8 @@
       }
       finally
       {
-         if (conn != null) conn.close();
+         if (conn != null)
+            conn.close();
       }
    }
 
@@ -532,7 +548,7 @@
          assertTrue(canReadDestination(conn, queue2));
          assertTrue(canWriteDestination(conn, queue2));
 
-         HashSet<Role>  newSecurityConfig = new HashSet<Role>();
+         HashSet<Role> newSecurityConfig = new HashSet<Role>();
          newSecurityConfig.add(new Role("someotherrole", true, true, false));
 
          setSecurityConfig(newSecurityConfig);
@@ -584,11 +600,10 @@
          assertFalse(canReadDestination(conn, queue2));
          assertFalse(canWriteDestination(conn, queue2, false));
 
-
          newSecurityConfig = new HashSet<Role>();
          newSecurityConfig.add(new Role("def", true, false, false));
 
-         configureSecurityForDestination("Queue2", true,  newSecurityConfig);
+         configureSecurityForDestination("Queue2", true, newSecurityConfig);
 
          assertTrue(canReadDestination(conn, queue2));
          assertFalse(canWriteDestination(conn, queue2, false));
@@ -596,16 +611,16 @@
          newSecurityConfig = new HashSet<Role>();
          newSecurityConfig.add(new Role("def", true, true, false));
 
-         configureSecurityForDestination("Queue2", true,  newSecurityConfig);
+         configureSecurityForDestination("Queue2", true, newSecurityConfig);
 
          assertTrue(canReadDestination(conn, queue2));
          assertTrue(canWriteDestination(conn, queue2, false));
 
-         //Now set to null
+         // Now set to null
 
          ServerManagement.configureSecurityForDestination("Queue2", null);
 
-         //Should fall back to the default config
+         // Should fall back to the default config
          HashSet<Role> lockedConf = new HashSet<Role>();
          lockedConf.add(new Role("alien", true, true, true));
 
@@ -643,18 +658,16 @@
          assertTrue(canReadDestination(conn, topic2));
          assertTrue(canWriteDestination(conn, topic2));
 
-
          HashSet<Role> newSecurityConfig = new HashSet<Role>();
-         newSecurityConfig.add(new Role("someotherrole", true, true, false)) ;
+         newSecurityConfig.add(new Role("someotherrole", true, true, false));
 
          configureSecurityForDestination("Topic2", false, newSecurityConfig);
 
          assertFalse(canReadDestination(conn, topic2));
          assertFalse(canWriteDestination(conn, topic2, false));
 
-
          newSecurityConfig = new HashSet<Role>();
-         newSecurityConfig.add(new Role("def", true, false, false)) ;
+         newSecurityConfig.add(new Role("def", true, false, false));
 
          configureSecurityForDestination("Topic2", false, newSecurityConfig);
 
@@ -662,20 +675,20 @@
          assertFalse(canWriteDestination(conn, topic2, false));
 
          newSecurityConfig = new HashSet<Role>();
-         newSecurityConfig.add(new Role("def", true, true, false)) ;
+         newSecurityConfig.add(new Role("def", true, true, false));
 
-         configureSecurityForDestination("Topic2", false,  newSecurityConfig);
+         configureSecurityForDestination("Topic2", false, newSecurityConfig);
 
          assertTrue(canReadDestination(conn, topic2));
          assertTrue(canWriteDestination(conn, topic2, false));
 
-         //Now set to null
+         // Now set to null
 
          configureSecurityForDestination("Topic2", false, null);
 
-         //Should fall back to the default config
+         // Should fall back to the default config
          HashSet<Role> lockedConf = new HashSet<Role>();
-         lockedConf.add(new Role("alien", true, true, true)) ;
+         lockedConf.add(new Role("alien", true, true, true));
          Set<Role> orig = getSecurityConfig();
          setSecurityConfig(lockedConf);
 
@@ -708,7 +721,7 @@
          // configure the queue to allow "def" to read
          HashSet<Role> config = new HashSet<Role>();
          config.add(new Role("def", true, false, false));
-         configureSecurityForDestination("Accounting", true, config );
+         configureSecurityForDestination("Accounting", true, config);
 
          // configure the topic to prevent "def" from reading
          HashSet<Role> config2 = new HashSet<Role>();
@@ -751,7 +764,7 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       oldDefaultConfig = getSecurityConfig();
 
       HashSet<Role> roles = new HashSet<Role>();
@@ -761,7 +774,6 @@
       roles.add(new Role("john", true, false, false));
       configureSecurityForDestination("Queue1", true, roles);
 
-
       HashSet<Role> roles2 = new HashSet<Role>();
       roles2.add(new Role("guest", true, true, true));
       roles2.add(new Role("publisher", true, true, false));
@@ -777,15 +789,15 @@
 
    protected void tearDown() throws Exception
    {
-   	super.tearDown();
-   	
+      super.tearDown();
+
       setSecurityConfig(oldDefaultConfig);
-      configureSecurityForDestination("Queue1", true,  null);
-      configureSecurityForDestination("Queue2", true,  null);
+      configureSecurityForDestination("Queue1", true, null);
+      configureSecurityForDestination("Queue2", true, null);
       configureSecurityForDestination("Topic1", false, null);
       configureSecurityForDestination("Topic2", false, null);
    }
-      
+
    // Private -------------------------------------------------------
 
    private boolean canReadDestination(Connection conn, Destination dest) throws Exception
@@ -808,6 +820,7 @@
       }
 
    }
+
    private boolean canWriteDestination(Connection conn, Destination dest) throws Exception
    {
       boolean transacted = canWriteDestination(conn, dest, true);
@@ -901,15 +914,14 @@
 
    private void testSecurityForTemporaryDestination(boolean isQueue) throws Exception
    {
-      Destination dest = isQueue ? (Destination) queue1 : topic1;
+      Destination dest = isQueue ? (Destination)queue1 : topic1;
 
       Connection conn = cf.createConnection("guest", "guest");
       try
       {
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Destination temporaryDestination = isQueue
-            ? (Destination) session.createTemporaryQueue()
-            : session.createTemporaryTopic();
+         Destination temporaryDestination = isQueue ? (Destination)session.createTemporaryQueue()
+                                                   : session.createTemporaryTopic();
          Message message = session.createMessage();
          message.setJMSReplyTo(temporaryDestination);
          MessageProducer producer = session.createProducer(dest);
@@ -947,9 +959,6 @@
       }
    }
 
-   
    // Inner classes -------------------------------------------------
 
 }
-
-

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -209,29 +209,29 @@
          }
       }, NUM_THREADS);
    }
-   
-//   public void testM() throws Exception
-//   {
-//      runTestMultipleThreads(new RunnableT()
-//      {
-//         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-//         {
-//            doTestM(sf, threadNum);
-//         }
-//      }, NUM_THREADS);
-//   }
-   
-//   public void testN() throws Exception
-//   {
-//      runTestMultipleThreads(new RunnableT()
-//      {
-//         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-//         {
-//            doTestN(sf, threadNum);
-//         }
-//      }, NUM_THREADS);
-//   }
 
+   // public void testM() throws Exception
+   // {
+   // runTestMultipleThreads(new RunnableT()
+   // {
+   // public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+   // {
+   // doTestM(sf, threadNum);
+   // }
+   // }, NUM_THREADS);
+   // }
+
+   public void testN() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestN(sf, threadNum);
+         }
+      }, NUM_THREADS);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -370,7 +370,7 @@
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          assertTrue(ok);
-         
+
          if (handler.failure != null)
          {
             throw new Exception("Handler failed: " + handler.failure);
@@ -457,7 +457,7 @@
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          assertTrue(ok);
-         
+
          if (handler.failure != null)
          {
             throw new Exception("Handler failed: " + handler.failure);
@@ -562,7 +562,7 @@
          boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          assertTrue(ok);
-         
+
          if (handler.failure != null)
          {
             throw new Exception("Handler failed: " + handler.failure);
@@ -1001,123 +1001,123 @@
 
       s.close();
    }
-   
-   //Browsers
-   //FIXME - this test won't work until we use a proper iterator for browsing a queue.
-   //Making a copy of the queue for a browser consumer doesn't work well with replication since
-   //When replicating the create consumer (browser) to the backup, when executed on the backup the
-   //backup may have different messages in its queue since been added on different threads.
-   //So when replicating deliveries they may not be found.
-   //https://jira.jboss.org/jira/browse/JBMESSAGING-1433
-//   protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
-//   {
-//      long start = System.currentTimeMillis();
-//
-//      ClientSession sessSend = sf.createSession(false, true, true, false);
-//      
-//      ClientSession sessConsume = sf.createSession(false, true, true, false);
-//      
-//      sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
-//
-//      final int numMessages = 100;
-//
-//      ClientProducer producer = sessSend.createProducer(ADDRESS);
-//
-//      sendMessages(sessSend, producer, numMessages, threadNum);
-//      
-//      ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-//                                                          null, false, true);
-//      
-//      Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-//
-//         assertNotNull(msg);
-//
-//         int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-//         int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//
-//         Integer c = consumerCounts.get(tn);
-//         if (c == null)
-//         {
-//            c = new Integer(cnt);
-//         }
-//
-//         if (cnt != c.intValue())
-//         {
-//            throw new Exception("Invalid count, expected " + c + " got " + cnt);
-//         }
-//         
-//         c++;
-//         
-//         //Wrap
-//         if (c == numMessages)
-//         {
-//            c = 0;
-//         }
-//         
-//         consumerCounts.put(tn, c);
-//
-//         msg.acknowledge();         
-//      }
-//
-//      sessConsume.close();
-//      
-//      sessConsume = sf.createSession(false, true, true, false);
-//      
-//      browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-//                                           null, false, true);
-//      
-//      //Messages should still be there
-//      
-//      consumerCounts.clear();
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-//
-//         assertNotNull(msg);
-//
-//         int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-//         int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//
-//         Integer c = consumerCounts.get(tn);
-//         if (c == null)
-//         {
-//            c = new Integer(cnt);
-//         }
-//
-//         if (cnt != c.intValue())
-//         {
-//            throw new Exception("Invalid count, expected " + c + " got " + cnt);
-//         }
-//         
-//         c++;
-//         
-//         //Wrap
-//         if (c == numMessages)
-//         {
-//            c = 0;
-//         }
-//         
-//         consumerCounts.put(tn, c);
-//
-//         msg.acknowledge();         
-//      }
-//      
-//      sessConsume.close();
-//      
-//      sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
-//      
-//      sessSend.close();
-//
-//      long end = System.currentTimeMillis();
-//
-//      log.info("duration " + (end - start));
-//   }
-      
+
+   // Browsers
+   // FIXME - this test won't work until we use a proper iterator for browsing a queue.
+   // Making a copy of the queue for a browser consumer doesn't work well with replication since
+   // When replicating the create consumer (browser) to the backup, when executed on the backup the
+   // backup may have different messages in its queue since been added on different threads.
+   // So when replicating deliveries they may not be found.
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
+   // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
+   // {
+   // long start = System.currentTimeMillis();
+   //
+   // ClientSession sessSend = sf.createSession(false, true, true, false);
+   //      
+   // ClientSession sessConsume = sf.createSession(false, true, true, false);
+   //      
+   // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
+   //
+   // final int numMessages = 100;
+   //
+   // ClientProducer producer = sessSend.createProducer(ADDRESS);
+   //
+   // sendMessages(sessSend, producer, numMessages, threadNum);
+   //      
+   // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
+   // null, false, true);
+   //      
+   // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
+   //      
+   // for (int i = 0; i < numMessages; i++)
+   // {
+   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
+   //
+   // assertNotNull(msg);
+   //
+   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+   //
+   // Integer c = consumerCounts.get(tn);
+   // if (c == null)
+   // {
+   // c = new Integer(cnt);
+   // }
+   //
+   // if (cnt != c.intValue())
+   // {
+   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
+   // }
+   //         
+   // c++;
+   //         
+   // //Wrap
+   // if (c == numMessages)
+   // {
+   // c = 0;
+   // }
+   //         
+   // consumerCounts.put(tn, c);
+   //
+   // msg.acknowledge();
+   // }
+   //
+   // sessConsume.close();
+   //      
+   // sessConsume = sf.createSession(false, true, true, false);
+   //      
+   // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
+   // null, false, true);
+   //      
+   // //Messages should still be there
+   //      
+   // consumerCounts.clear();
+   //      
+   // for (int i = 0; i < numMessages; i++)
+   // {
+   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
+   //
+   // assertNotNull(msg);
+   //
+   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+   //
+   // Integer c = consumerCounts.get(tn);
+   // if (c == null)
+   // {
+   // c = new Integer(cnt);
+   // }
+   //
+   // if (cnt != c.intValue())
+   // {
+   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
+   // }
+   //         
+   // c++;
+   //         
+   // //Wrap
+   // if (c == numMessages)
+   // {
+   // c = 0;
+   // }
+   //         
+   // consumerCounts.put(tn, c);
+   //
+   // msg.acknowledge();
+   // }
+   //      
+   // sessConsume.close();
+   //      
+   // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
+   //      
+   // sessSend.close();
+   //
+   // long end = System.currentTimeMillis();
+   //
+   // log.info("duration " + (end - start));
+   // }
+
    protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
    {
       ClientSession sessCreate = sf.createSession(false, true, true, false);
@@ -1125,11 +1125,11 @@
       sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
 
       ClientSession sess = sf.createSession(false, true, true, false);
-      
+
       sess.stop();
 
       sess.start();
-      
+
       sess.stop();
 
       ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
@@ -1144,7 +1144,7 @@
       message.getBody().flip();
 
       producer.send(message);
-      
+
       sess.start();
 
       ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
@@ -1152,9 +1152,9 @@
       assertNotNull(message2);
 
       message2.acknowledge();
-      
+
       sess.stop();
-      
+
       sess.start();
 
       sess.close();
@@ -1166,7 +1166,7 @@
 
    protected int getNumIterations()
    {
-      return 1000;
+      return 10;
    }
 
    protected void setUp() throws Exception
@@ -1182,11 +1182,11 @@
    {
       log.info("************* Ending test " + this.getName());
 
-      if(liveService != null && liveService.isStarted())
+      if (liveService != null && liveService.isStarted())
       {
          liveService.stop();
       }
-      if(backupService != null && backupService.isStarted())
+      if (backupService != null && backupService.isStarted())
       {
          backupService.stop();
       }
@@ -1351,7 +1351,7 @@
 
    private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages) throws Exception
    {
-      //We make sure the messages arrive in the order they were sent from a particular producer
+      // We make sure the messages arrive in the order they were sent from a particular producer
       Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
 
       for (int i = 0; i < numMessages; i++)
@@ -1383,15 +1383,15 @@
             {
                throw new Exception("Invalid count, expected " + c + " got " + cnt);
             }
-            
+
             c++;
-            
-            //Wrap
+
+            // Wrap
             if (c == numMessages)
             {
                c = 0;
             }
-            
+
             consumerCounts.put(tn, c);
 
             msg.acknowledge();
@@ -1469,15 +1469,15 @@
       volatile String failure;
 
       final int tn;
-      
+
       final int numMessages;
-      
+
       volatile boolean done;
 
       MyHandler(final int threadNum, final int numMessages)
       {
          this.tn = threadNum;
-         
+
          this.numMessages = numMessages;
       }
 
@@ -1491,7 +1491,7 @@
          {
             log.error("Failed to process", me);
          }
-         
+
          if (done)
          {
             return;
@@ -1505,9 +1505,9 @@
          {
             c = new Integer(cnt);
          }
-         
-         //log.info("got message " + threadNum + "-" + cnt);
 
+         // log.info("got message " + threadNum + "-" + cnt);
+
          if (cnt != c.intValue())
          {
             failure = "Invalid count, expected " + c + " got " + cnt;
@@ -1515,7 +1515,7 @@
 
             latch.countDown();
          }
-         
+
          if (tn == threadNum && c == numMessages - 1)
          {
             done = true;
@@ -1523,15 +1523,14 @@
          }
 
          c++;
-         //Wrap around at 100
+         // Wrap around at 100
          if (c == 100)
          {
             c = 0;
          }
-         
+
          counts.put(threadNum, c);
 
-         
       }
    }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-10-21 10:39:09 UTC (rev 5162)
@@ -76,7 +76,7 @@
       return addresses.contains(address);
    }
 
-   public Binding getBinding(SimpleString queueName) throws Exception
+   public Binding getBinding(SimpleString queueName)
    {
       return bindings.get(queueName);
    }




More information about the jboss-cvs-commits mailing list