[jboss-cvs] JBoss Messaging SVN: r6049 - in trunk: src/main/org/jboss/messaging/core/management/impl and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 9 10:53:42 EDT 2009


Author: timfox
Date: 2009-03-09 10:53:41 -0400 (Mon, 09 Mar 2009)
New Revision: 6049

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/utils/TypedProperties.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java
Log:
clustered backup

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -59,7 +59,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -270,7 +270,7 @@
    {
       checkClosed();
 
-      SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temp);
+      CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp);
 
       channel.sendBlocking(request);
    }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -551,7 +551,7 @@
             TypedProperties notifProps;
             if (notification.getProperties() != null)
             {
-               notifProps = notification.getProperties();
+               notifProps = new TypedProperties(notification.getProperties());
             }
             else
             {

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -224,6 +224,10 @@
       }
 
       addressManager.clear();
+      
+      queueInfos.clear();
+      
+      transientIDs.clear();
 
       started = false;
    }
@@ -361,7 +365,7 @@
                                                                                                  .toString());
 
                      long redistributionDelay = addressSettings.getRedistributionDelay();
-
+                     
                      if (redistributionDelay != -1)
                      {
                         queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -431,7 +435,7 @@
                                                                                                  .toString());
 
                      long redistributionDelay = addressSettings.getRedistributionDelay();
-
+                     
                      if (redistributionDelay != -1)
                      {
                         queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -441,6 +445,10 @@
 
                break;
             }
+            default:
+            {
+               throw new IllegalArgumentException("Invalid type " + type.toInt());
+            }
 
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -86,7 +86,6 @@
 
    public void clear()
    {
-    //  destinations.clear();
       nameMap.clear();
       mappings.clear();
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -31,8 +31,8 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -106,7 +106,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -678,12 +678,12 @@
             packet = new SessionQueueQueryResponseMessage();
             break;
          }
-         case SESS_CREATE_QUEUE:
+         case CREATE_QUEUE:
          {
-            packet = new SessionCreateQueueMessage();
+            packet = new CreateQueueMessage();
             break;
          }
-         case SESS_DELETE_QUEUE:
+         case DELETE_QUEUE:
          {
             packet = new SessionDeleteQueueMessage();
             break;
@@ -888,6 +888,8 @@
       private CommandConfirmationHandler commandConfirmationHandler;
 
       private int responseActionCount;
+      
+      private boolean playedResponsesOnFailure;
 
       public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
       {
@@ -1158,6 +1160,19 @@
 
       public void executeOutstandingDelayedResults()
       {
+         //Execute on different thread to avoid deadlock
+         
+         new Thread()
+         {
+            public void run()
+            {
+               doExecuteOutstandingDelayedResults();
+            }
+         }.start();
+      }
+      
+      private void doExecuteOutstandingDelayedResults()
+      {
          synchronized (replicationLock)
          {
             // Execute all the response actions now
@@ -1328,8 +1343,6 @@
          }
       }
       
-      private boolean playedResponsesOnFailure;
-
       // This will never get called concurrently by more than one thread
 
       // TODO it's not ideal synchronizing this since it forms a contention point with replication
@@ -1454,8 +1467,11 @@
             if (packet.isResponse())
             {
                response = packet;
+               
                confirm(packet);
+               
                lock.lock();
+               
                try
                {
                   sendCondition.signal();

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java (from rev 6035, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,147 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+
+ * @version <tt>$Revision$</tt>
+ */
+public class CreateQueueMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString address;
+   private SimpleString queueName;
+   private SimpleString filterString;
+   private boolean durable;
+   private boolean temporary;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public CreateQueueMessage(final SimpleString address, final SimpleString queueName,
+   		final SimpleString filterString, final boolean durable, final boolean temporary)
+   {
+      super(CREATE_QUEUE);
+
+      this.address = address;
+      this.queueName = queueName;
+      this.filterString = filterString;
+      this.durable = durable;
+      this.temporary = temporary;
+   }
+   
+   public CreateQueueMessage()
+   {
+      super(CREATE_QUEUE);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", address=" + address);
+      buff.append(", queueName=" + queueName);
+      buff.append(", filterString=" + filterString);
+      buff.append(", durable=" + durable);
+      buff.append(", temporary=" + temporary);
+      buff.append("]");
+      return buff.toString();
+   }
+   
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public SimpleString getQueueName()
+   {
+      return queueName;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+   
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
+    
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeSimpleString(address);
+      buffer.writeSimpleString(queueName);
+      buffer.writeNullableSimpleString(filterString);
+      buffer.writeBoolean(durable);
+      buffer.writeBoolean(temporary);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      address = buffer.readSimpleString();
+      queueName = buffer.readSimpleString();
+      filterString = buffer.readNullableSimpleString();
+      durable = buffer.readBoolean();
+      temporary = buffer.readBoolean();
+   }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof CreateQueueMessage == false)
+      {
+         return false;
+      }
+            
+      CreateQueueMessage r = (CreateQueueMessage)other;
+      
+      return super.equals(other) && r.address.equals(this.address) && 
+             r.queueName.equals(this.queueName) &&
+             (r.filterString == null ? this.filterString == null : r.filterString.equals(this.filterString)) &&
+             r.durable == this.durable &&
+             r.temporary == this.temporary;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -67,7 +67,12 @@
    public static final byte REATTACH_SESSION_RESP = 33;
 
    public static final byte REPLICATE_CREATESESSION = 34;
+   
+   public static final byte CREATE_QUEUE = 35;
 
+   public static final byte DELETE_QUEUE = 36;
+
+
    // Session
    public static final byte SESS_CREATECONSUMER = 40;
 
@@ -83,10 +88,6 @@
 
    public static final byte SESS_QUEUEQUERY_RESP = 46;
 
-   public static final byte SESS_CREATE_QUEUE = 47;
-
-   public static final byte SESS_DELETE_QUEUE = 48;
-
    public static final byte SESS_BINDINGQUERY = 49;
 
    public static final byte SESS_BINDINGQUERY_RESP = 50;
@@ -145,12 +146,18 @@
    
    //Replication
 
-   public static final byte SESS_REPLICATE_DELIVERY = 91;
+   public static final byte SESS_REPLICATE_DELIVERY = 90;
    
-   public static final byte REPLICATE_UPDATE_CONNECTORS = 92;
+   public static final byte REPLICATE_ADD_REMOTE_QUEUE_BINDING = 91;
    
-   public static final byte REPLICATE_NOTIFICATION = 93;
+   public static final byte REPLICATE_REMOVE_REMOTE_QUEUE_BINDING = 92;
    
+   public static final byte REPLICATE_ADD_REMOTE_CONSUMER = 93;
+   
+   public static final byte REPLICATE_REMOVE_REMOTE_CONSUMER = 94;
+   
+   public static final byte REPLICATE_ACKNOWLEDGE = 95;
+   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -1,147 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
-
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateQueueMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private SimpleString address;
-   private SimpleString queueName;
-   private SimpleString filterString;
-   private boolean durable;
-   private boolean temporary;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionCreateQueueMessage(final SimpleString address, final SimpleString queueName,
-   		final SimpleString filterString, final boolean durable, final boolean temporary)
-   {
-      super(SESS_CREATE_QUEUE);
-
-      this.address = address;
-      this.queueName = queueName;
-      this.filterString = filterString;
-      this.durable = durable;
-      this.temporary = temporary;
-   }
-   
-   public SessionCreateQueueMessage()
-   {
-      super(SESS_CREATE_QUEUE);
-   }
-
-   // Public --------------------------------------------------------
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buff = new StringBuffer(getParentString());
-      buff.append(", address=" + address);
-      buff.append(", queueName=" + queueName);
-      buff.append(", filterString=" + filterString);
-      buff.append(", durable=" + durable);
-      buff.append(", temporary=" + temporary);
-      buff.append("]");
-      return buff.toString();
-   }
-   
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
-   public SimpleString getQueueName()
-   {
-      return queueName;
-   }
-
-   public SimpleString getFilterString()
-   {
-      return filterString;
-   }
-
-   public boolean isDurable()
-   {
-      return durable;
-   }
-   
-   public boolean isTemporary()
-   {
-      return temporary;
-   }
-    
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.writeSimpleString(address);
-      buffer.writeSimpleString(queueName);
-      buffer.writeNullableSimpleString(filterString);
-      buffer.writeBoolean(durable);
-      buffer.writeBoolean(temporary);
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      address = buffer.readSimpleString();
-      queueName = buffer.readSimpleString();
-      filterString = buffer.readNullableSimpleString();
-      durable = buffer.readBoolean();
-      temporary = buffer.readBoolean();
-   }
-   
-   public boolean equals(Object other)
-   {
-      if (other instanceof SessionCreateQueueMessage == false)
-      {
-         return false;
-      }
-            
-      SessionCreateQueueMessage r = (SessionCreateQueueMessage)other;
-      
-      return super.equals(other) && r.address.equals(this.address) && 
-             r.queueName.equals(this.queueName) &&
-             (r.filterString == null ? this.filterString == null : r.filterString.equals(this.filterString)) &&
-             r.durable == this.durable &&
-             r.temporary == this.temporary;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -44,14 +44,14 @@
 
    public SessionDeleteQueueMessage(final SimpleString queueName)
    {
-      super(SESS_DELETE_QUEUE);
+      super(DELETE_QUEUE);
 
       this.queueName = queueName;
    }
    
    public SessionDeleteQueueMessage()
    {
-      super(SESS_DELETE_QUEUE);
+      super(DELETE_QUEUE);
    }
 
    // Public --------------------------------------------------------

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * 
+ * A ReplicateAcknowledgeMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateAcknowledgeMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+      
+   //TODO - use queue id not name for smaller packet size
+   private SimpleString uniqueName;
+   
+   private long messageID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicateAcknowledgeMessage(final SimpleString uniqueName, final long messageID)
+   {
+      super(REPLICATE_ACKNOWLEDGE);
+
+      this.uniqueName = uniqueName;
+      
+      this.messageID = messageID;
+   }
+
+   // Public --------------------------------------------------------
+
+   public ReplicateAcknowledgeMessage()
+   {
+      super(REPLICATE_ACKNOWLEDGE);
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeSimpleString(uniqueName);
+      buffer.writeLong(messageID);
+   }
+
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      uniqueName = buffer.readSimpleString();
+      messageID = buffer.readLong();
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return uniqueName;
+   }
+   
+   public long getMessageID()
+   {
+      return messageID;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -1,138 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-
-import java.util.List;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.Pair;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * 
- * A ReplicateClusterConnectionUpdate
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 4 Mar 2009 17:46:53
- *
- *
- */
-public class ReplicateClusterConnectionUpdate extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private SimpleString clusterConnectionName;
-   
-   private List<Pair<TransportConfiguration, TransportConfiguration>> connectors;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ReplicateClusterConnectionUpdate(SimpleString clusterConnectionName, List<Pair<TransportConfiguration, TransportConfiguration>> connectors)
-   {
-      super(REPLICATE_UPDATE_CONNECTORS);
-
-      this.clusterConnectionName = clusterConnectionName;
-      
-      this.connectors = connectors;
-   }
-
-   public ReplicateClusterConnectionUpdate()
-   {
-      super(REPLICATE_UPDATE_CONNECTORS);
-   }
-
-   // Public --------------------------------------------------------
-
-   public SimpleString getClusterConnectionName()
-   {
-      return clusterConnectionName;
-   }
-   
-   public List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
-   {
-      return connectors;
-   }
-
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.writeSimpleString(clusterConnectionName);
-      
-      buffer.writeInt(connectors.size());
-
-      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
-      {
-         connectorPair.a.encode(buffer);
-
-         if (connectorPair.b != null)
-         {
-            buffer.writeBoolean(true);
-
-            connectorPair.b.encode(buffer);
-         }
-         else
-         {
-            buffer.writeBoolean(false);
-         }
-      }
-   }
-
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      clusterConnectionName = buffer.readSimpleString();
-      
-      int size = buffer.readInt();
-
-      for (int i = 0; i < size; i++)
-      {
-         TransportConfiguration connector = new TransportConfiguration();
-
-         connector.decode(buffer);
-
-         boolean existsBackup = buffer.readBoolean();
-
-         TransportConfiguration backupConnector = null;
-
-         if (existsBackup)
-         {
-            backupConnector = new TransportConfiguration();
-
-            backupConnector.decode(buffer);
-         }
-
-         Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
-                                                                                                                                       backupConnector);
-
-         connectors.add(connectorPair);
-      }
-   }
-
-   public boolean isRequiresConfirmations()
-   {
-      return false;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * 
+ * A ReplicateRemoteBindingAddedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteBindingAddedMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString clusterConnectionName;
+   
+   private SimpleString address;
+
+   private SimpleString uniqueName;
+   
+   private SimpleString routingName;
+
+   private int remoteQueueID;
+
+   private SimpleString filterString;
+
+   private SimpleString sfQueueName;
+
+   private int distance;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicateRemoteBindingAddedMessage(SimpleString clusterConnectionName,
+                                             SimpleString address,
+                                             SimpleString uniqueName,
+                                             SimpleString routingName,
+                                             int remoteQueueID,
+                                             SimpleString filterString,
+                                             SimpleString sfQueueName,                                       
+                                             int distance)
+   {
+      super(REPLICATE_ADD_REMOTE_QUEUE_BINDING);
+
+      this.clusterConnectionName = clusterConnectionName;
+      this.address = address;
+      this.uniqueName = uniqueName;
+      this.routingName = routingName;
+      this.remoteQueueID = remoteQueueID;
+      this.filterString = filterString;
+      this.sfQueueName = sfQueueName;
+      this.distance = distance;
+   }
+
+   // Public --------------------------------------------------------
+
+   public ReplicateRemoteBindingAddedMessage()
+   {
+      super(REPLICATE_ADD_REMOTE_QUEUE_BINDING);
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeSimpleString(clusterConnectionName);
+      buffer.writeSimpleString(address);
+      buffer.writeSimpleString(uniqueName);
+      buffer.writeSimpleString(routingName);
+      buffer.writeInt(remoteQueueID);
+      buffer.writeNullableSimpleString(filterString);
+      buffer.writeSimpleString(sfQueueName);
+      buffer.writeInt(distance);
+   }
+
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      clusterConnectionName = buffer.readSimpleString();
+      address = buffer.readSimpleString();
+      uniqueName = buffer.readSimpleString();
+      routingName = buffer.readSimpleString();
+      remoteQueueID = buffer.readInt();
+      filterString = buffer.readNullableSimpleString();
+      sfQueueName = buffer.readSimpleString();
+      distance = buffer.readInt();
+   }
+   
+   public SimpleString getClusterConnectionName()
+   {
+      return clusterConnectionName;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return uniqueName;
+   }
+   
+   public SimpleString getRoutingName()
+   {
+      return routingName;
+   }
+
+   public int getRemoteQueueID()
+   {
+      return remoteQueueID;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   public SimpleString getSfQueueName()
+   {
+      return sfQueueName;
+   }
+
+   public int getDistance()
+   {
+      return distance;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * 
+ * A ReplicateRemoteBindingRemovedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteBindingRemovedMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+      
+   private SimpleString uniqueName;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicateRemoteBindingRemovedMessage(SimpleString uniqueName)
+   {
+      super(REPLICATE_REMOVE_REMOTE_QUEUE_BINDING);
+
+      this.uniqueName = uniqueName;
+   }
+
+   // Public --------------------------------------------------------
+
+   public ReplicateRemoteBindingRemovedMessage()
+   {
+      super(REPLICATE_REMOVE_REMOTE_QUEUE_BINDING);
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeSimpleString(uniqueName);
+   }
+
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      uniqueName = buffer.readSimpleString();
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return uniqueName;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * 
+ * A ReplicateRemoteConsumerAddedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteConsumerAddedMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString uniqueBindingName;
+   
+   private SimpleString filterString;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicateRemoteConsumerAddedMessage(SimpleString uniqueBindingName, SimpleString filterString)
+   {
+      super(REPLICATE_ADD_REMOTE_CONSUMER);
+
+      this.uniqueBindingName = uniqueBindingName;
+      
+      this.filterString = filterString;
+   }
+
+   // Public --------------------------------------------------------
+
+   public ReplicateRemoteConsumerAddedMessage()
+   {
+      super(REPLICATE_ADD_REMOTE_CONSUMER);
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeSimpleString(uniqueBindingName);
+      
+      buffer.writeNullableSimpleString(filterString);
+   }
+
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      uniqueBindingName = buffer.readSimpleString();
+      
+      filterString = buffer.readNullableSimpleString();
+   }
+
+   public SimpleString getUniqueBindingName()
+   {
+      return uniqueBindingName;
+   }
+   
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * 
+ * A ReplicateRemoteConsumerRemovedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteConsumerRemovedMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString uniqueBindingName;
+   
+   private SimpleString filterString;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicateRemoteConsumerRemovedMessage(SimpleString uniqueBindingName, SimpleString filterString)
+   {
+      super(REPLICATE_REMOVE_REMOTE_CONSUMER);
+
+      this.uniqueBindingName = uniqueBindingName;
+      
+      this.filterString = filterString;
+   }
+
+   // Public --------------------------------------------------------
+
+   public ReplicateRemoteConsumerRemovedMessage()
+   {
+      super(REPLICATE_REMOVE_REMOTE_CONSUMER);
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeSimpleString(uniqueBindingName);
+      
+      buffer.writeNullableSimpleString(filterString);
+   }
+
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      uniqueBindingName = buffer.readSimpleString();
+      
+      filterString = buffer.readNullableSimpleString();
+   }
+
+   public SimpleString getUniqueBindingName()
+   {
+      return uniqueBindingName;
+   }
+   
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -85,26 +85,24 @@
                                               boolean xa,
                                               int sendWindowSize) throws Exception;
 
-   CreateSessionResponseMessage replicateCreateSession(String name,
-                                                       long channelID,
-                                                       long originalSessionID,
-                                                       String username,
-                                                       String password,
-                                                       int minLargeMessageSize,
-                                                       int incrementingVersion,
-                                                       RemotingConnection remotingConnection,
-                                                       boolean autoCommitSends,
-                                                       boolean autoCommitAcks,
-                                                       boolean preAcknowledge,
-                                                       boolean xa,
-                                                       int sendWindowSize) throws Exception;
-   
-   void updateClusterConnectionConnectors(SimpleString clusterConnectionName, List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
-   
+   void replicateCreateSession(String name,
+                               long channelID,
+                               long originalSessionID,
+                               String username,
+                               String password,
+                               int minLargeMessageSize,
+                               int incrementingVersion,
+                               RemotingConnection remotingConnection,
+                               boolean autoCommitSends,
+                               boolean autoCommitAcks,
+                               boolean preAcknowledge,
+                               boolean xa,
+                               int sendWindowSize) throws Exception;
+
    void removeSession(String name) throws Exception;
-   
+
    ServerSession getSession(String name);
-   
+
    Set<ServerSession> getSessions();
 
    boolean isStarted();
@@ -122,14 +120,14 @@
    ResourceManager getResourceManager();
 
    List<ServerSession> getSessions(String connectionID);
-   
+
    ClusterManager getClusterManager();
-   
+
    QueueFactory getQueueFactory();
-   
+
    SimpleString getNodeID();
-   
+
    UUID getUUID();
-   
+
    Channel getReplicatingChannel();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -32,7 +32,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -114,7 +114,7 @@
 
    void handleStop(Packet packet);
 
-   void handleCreateQueue(SessionCreateQueueMessage packet);
+   void handleCreateQueue(CreateQueueMessage packet);
 
    void handleDeleteQueue(SessionDeleteQueueMessage packet);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -48,5 +48,7 @@
 
    Transformer getTransformer();
 
-   boolean isUseDuplicateDetection();   
+   boolean isUseDuplicateDetection();  
+   
+   void activate();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -47,4 +47,6 @@
    int size();
 
    void broadcastConnectors() throws Exception;
+   
+   void activate();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -20,14 +20,9 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.server.cluster;
 
-import java.util.List;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -42,6 +37,14 @@
 public interface ClusterConnection extends MessagingComponent
 {
    SimpleString getName();
+
+   void handleReplicatedAddBinding(SimpleString address,
+                                   SimpleString uniqueName,
+                                   SimpleString routingName,
+                                   int queueID,
+                                   SimpleString filterString,
+                                   SimpleString queueName,                 
+                                   int distance) throws Exception;
    
-   void handleReplicatedUpdateConnectors(List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
+   void activate();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -43,4 +43,6 @@
    Set<ClusterConnection> getClusterConnections();
 
    ClusterConnection getClusterConnection(SimpleString name);
+   
+   void activate();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -64,8 +64,11 @@
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.postoffice.BindingType;
+import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
@@ -147,6 +150,10 @@
 
    private final String clusterPassword;
 
+   private Channel replicatingChannel;
+   
+   private boolean activated;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -169,7 +176,9 @@
                      final boolean useDuplicateDetection,
                      final SimpleString managementAddress,
                      final SimpleString managementNotificationAddress,
-                     final String clusterPassword) throws Exception
+                     final String clusterPassword,
+                     final Channel replicatingChannel,
+                     final boolean activated) throws Exception
    {
       this(nodeUUID,
            name,
@@ -188,7 +197,9 @@
            managementAddress,
            managementNotificationAddress,
            clusterPassword,
-           null);
+           null,
+           replicatingChannel,
+           activated);
    }
 
    public BridgeImpl(final UUID nodeUUID,
@@ -208,7 +219,9 @@
                      final SimpleString managementAddress,
                      final SimpleString managementNotificationAddress,
                      final String clusterPassword,
-                     final MessageFlowRecord flowRecord) throws Exception
+                     final MessageFlowRecord flowRecord,
+                     final Channel replicatingChannel,
+                     final boolean activated) throws Exception
    {
       this.nodeUUID = nodeUUID;
 
@@ -254,6 +267,10 @@
       this.clusterPassword = clusterPassword;
 
       this.flowRecord = flowRecord;
+
+      this.replicatingChannel = replicatingChannel;
+      
+      this.activated = activated;            
    }
 
    public synchronized void start() throws Exception
@@ -265,7 +282,10 @@
 
       started = true;
 
-      executor.execute(new CreateObjectsRunnable());
+      if (activated)
+      {
+         executor.execute(new CreateObjectsRunnable());
+      }
    }
 
    private void cancelRefs() throws Exception
@@ -304,89 +324,214 @@
       this.waitForRunnablesToComplete();
    }
 
-   private class StopRunnable implements Runnable
+   public boolean isStarted()
    {
-      public void run()
-      {
-         try
-         {
-            synchronized (BridgeImpl.this)
-            {
-               if (!started)
-               {
-                  return;
-               }
+      return started;
+   }
 
-               if (session != null)
-               {
-                  session.close();
-               }
+   public synchronized void activate()
+   {
+      replicatingChannel = null;
+      
+      activated = true;
 
-               started = false;
+      executor.execute(new CreateObjectsRunnable());
+   }
 
-               active = false;
+   public SimpleString getName()
+   {
+      return name;
+   }
 
-            }
+   public Queue getQueue()
+   {
+      return queue;
+   }
 
-            queue.removeConsumer(BridgeImpl.this);
+   public Filter getFilter()
+   {
+      return filter;
+   }
 
-            cancelRefs();
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to stop bridge", e);
-         }
+   public SimpleString getForwardingAddress()
+   {
+      return forwardingAddress;
+   }
+
+   public Transformer getTransformer()
+   {
+      return transformer;
+   }
+
+   public boolean isUseDuplicateDetection()
+   {
+      return useDuplicateDetection;
+   }
+
+   // For testing only
+   public RemotingConnection getForwardingConnection()
+   {
+      if (session == null)
+      {
+         return null;
       }
+      else
+      {
+         return ((ClientSessionImpl)session).getConnection();
+      }
    }
 
-   private class FailRunnable implements Runnable
+   // SendAcknowledgementHandler implementation ---------------------
+
+   public void sendAcknowledged(final Message message)
    {
-      public void run()
+      try
       {
-         synchronized (BridgeImpl.this)
+         final MessageReference ref = refs.poll();
+
+         if (ref != null)
          {
-            if (!started)
+            if (replicatingChannel == null)
             {
-               return;
+               // Acknowledge when we know send has been processed on the server
+               ref.getQueue().acknowledge(ref);
             }
+            else
+            {
+               Packet packet = new ReplicateAcknowledgeMessage(name, ref.getMessage().getMessageID());
 
-            if (flowRecord != null)
+               replicatingChannel.replicatePacket(packet, 2, new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        ref.getQueue().acknowledge(ref);
+                     }
+                     catch (Exception e)
+                     {
+                        log.info("Failed to ack", e);
+                     }
+                  }
+               });
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         log.info("Failed to ack", e);
+      }
+   }
+
+   // Consumer implementation ---------------------------------------
+
+   public HandleStatus handle(final MessageReference ref) throws Exception
+   {
+      if (filter != null && !filter.match(ref.getMessage()))
+      {
+         return HandleStatus.NO_MATCH;
+      }
+
+      if (!active)
+      {
+         return HandleStatus.BUSY;
+      }
+
+      synchronized (this)
+      {
+         ref.getQueue().referenceHandled();
+
+         ServerMessage message = ref.getMessage();
+
+         refs.add(ref);
+
+         if (flowRecord != null)
+         {
+            // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
+            // only
+            // the one pertinent for the destination node - this is important since different queues on different
+            // nodes could have same queue ids
+            // Note we must copy since same message may get routed to other nodes which require different headers
+            message = message.copy();
+
+            // TODO - we can optimise this
+
+            Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+
+            byte[] queueIds = (byte[])message.getProperty(idsHeaderName);
+
+            for (SimpleString propName : propNames)
             {
-               try
+               if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
                {
-                  flowRecord.reset();
+                  message.removeProperty(propName);
                }
-               catch (Exception e)
-               {
-                  log.error("Failed to reset", e);
-               }
             }
 
-            active = false;
+            message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+
+            message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
          }
 
-         try
+         if (useDuplicateDetection && !message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
          {
-            queue.removeConsumer(BridgeImpl.this);
+            // If we are using duplicate detection and there's not already a duplicate detection header, then
+            // we add a header composed of the persistent node id and the message id, which makes it globally unique
+            // between restarts.
+            // If you use a cluster connection then a guid based duplicate id will be used since it is added *before*
+            // the
+            // message goes into the store and forward queue.
+            // But with this technique it also works when the messages don't already have such a header in them in the
+            // queue.
+            byte[] bytes = new byte[24];
 
-            session.cleanUp();
+            ByteBuffer bb = ByteBuffer.wrap(bytes);
 
-            cancelRefs();
+            bb.put(nodeUUID.asBytes());
 
-            csf.close();
+            bb.putLong(message.getMessageID());
+
+            message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
          }
-         catch (Exception e)
+
+         if (transformer != null)
          {
-            log.error("Failed to stop", e);
+            message = transformer.transform(message);
          }
 
-         if (!createObjects())
+         SimpleString dest;
+
+         if (forwardingAddress != null)
          {
-            started = false;
+            dest = forwardingAddress;
          }
+         else
+         {
+            // Preserve the original address
+            dest = message.getDestination();
+         }
+
+         producer.send(dest, message);
+
+         return HandleStatus.HANDLED;
       }
    }
 
+   // FailureListener implementation --------------------------------
+
+   public boolean connectionFailed(final MessagingException me)
+   {
+      fail();
+
+      return true;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
    private void waitForRunnablesToComplete()
    {
       // Wait for any create objects runnable to complete
@@ -532,183 +677,91 @@
       }
    }
 
-   public boolean isStarted()
-   {
-      return started;
-   }
+   // Inner classes -------------------------------------------------
 
-   public SimpleString getName()
+   private class StopRunnable implements Runnable
    {
-      return name;
-   }
+      public void run()
+      {
+         try
+         {
+            synchronized (BridgeImpl.this)
+            {
+               if (!started)
+               {
+                  return;
+               }
 
-   public Queue getQueue()
-   {
-      return queue;
-   }
+               if (session != null)
+               {
+                  session.close();
+               }
 
-   public Filter getFilter()
-   {
-      return filter;
-   }
+               started = false;
 
-   public SimpleString getForwardingAddress()
-   {
-      return forwardingAddress;
-   }
+               active = false;
 
-   public Transformer getTransformer()
-   {
-      return transformer;
-   }
+            }
 
-   public boolean isUseDuplicateDetection()
-   {
-      return useDuplicateDetection;
-   }
+            queue.removeConsumer(BridgeImpl.this);
 
-   // For testing only
-   public RemotingConnection getForwardingConnection()
-   {
-      if (session == null)
-      {
-         return null;
-      }
-      else
-      {
-         return ((ClientSessionImpl)session).getConnection();
-      }
-   }
-
-   // SendAcknowledgementHandler implementation ---------------------
-
-   public void sendAcknowledged(final Message message)
-   {
-      try
-      {
-         MessageReference ref = refs.poll();
-
-         if (ref != null)
+            cancelRefs();
+         }
+         catch (Exception e)
          {
-            // Acknowledge when we know send has been processed on the server
-            ref.getQueue().acknowledge(ref);
+            log.error("Failed to stop bridge", e);
          }
       }
-      catch (Exception e)
-      {
-         log.info("Failed to ack", e);
-      }
    }
 
-   // Consumer implementation ---------------------------------------
-
-   public HandleStatus handle(final MessageReference ref) throws Exception
+   private class FailRunnable implements Runnable
    {
-      if (filter != null && !filter.match(ref.getMessage()))
+      public void run()
       {
-         return HandleStatus.NO_MATCH;
-      }
-
-      if (!active)
-      {
-         return HandleStatus.BUSY;
-      }
-
-      synchronized (this)
-      {
-         ref.getQueue().referenceHandled();
-
-         ServerMessage message = ref.getMessage();
-
-         refs.add(ref);
-         
-         if (flowRecord != null)
+         synchronized (BridgeImpl.this)
          {
-            // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
-            // only
-            // the one pertinent for the destination node - this is important since different queues on different
-            // nodes could have same queue ids
-            // Note we must copy since same message may get routed to other nodes which require different headers
-            message = message.copy();
+            if (!started)
+            {
+               return;
+            }
 
-            // TODO - we can optimise this
-
-            Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
-
-            byte[] queueIds = (byte[])message.getProperty(idsHeaderName);
-
-            for (SimpleString propName : propNames)
+            if (flowRecord != null)
             {
-               if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+               try
                {
-                  message.removeProperty(propName);
+                  flowRecord.reset();
                }
+               catch (Exception e)
+               {
+                  log.error("Failed to reset", e);
+               }
             }
 
-            message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
-
-            message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
+            active = false;
          }
 
-         if (useDuplicateDetection && !message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
+         try
          {
-            //If we are using duplicate detection and there's not already a duplicate detection header, then
-            //we add a header composed of the persistent node id and the message id, which makes it globally unique
-            //between restarts.
-            //If you use a cluster connection then a guid based duplicate id will be used since it is added *before* the
-            //message goes into the store and forward queue.
-            //But with this technique it also works when the messages don't already have such a header in them in the queue.
-            byte[] bytes = new byte[24];
+            queue.removeConsumer(BridgeImpl.this);
 
-            ByteBuffer bb = ByteBuffer.wrap(bytes);
+            session.cleanUp();
 
-            bb.put(nodeUUID.asBytes());
+            cancelRefs();
 
-            bb.putLong(message.getMessageID());
-
-            message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
+            csf.close();
          }
-
-         if (transformer != null)
+         catch (Exception e)
          {
-            message = transformer.transform(message);
+            log.error("Failed to stop", e);
          }
 
-         SimpleString dest;
-
-         if (forwardingAddress != null)
+         if (!createObjects())
          {
-            dest = forwardingAddress;
+            started = false;
          }
-         else
-         {
-            // Preserve the original address
-            dest = message.getDestination();
-         }
-
-         producer.send(dest, message);
-         
-         return HandleStatus.HANDLED;
       }
    }
 
-   // FailureListener implementation --------------------------------
-
-   public boolean connectionFailed(final MessagingException me)
-   {
-      fail();
-
-      return true;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
    private class CreateObjectsRunnable implements Runnable
    {
       public synchronized void run()

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -22,7 +22,6 @@
 
 package org.jboss.messaging.core.server.cluster.impl;
 
-import java.io.ByteArrayOutputStream;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
@@ -66,12 +65,15 @@
    private boolean started;
 
    private ScheduledFuture<?> future;
+   
+   private boolean active;
 
    public BroadcastGroupImpl(final String nodeID,
                              final String name,
                              final int localPort,
                              final InetAddress groupAddress,
-                             final int groupPort) throws Exception
+                             final int groupPort,
+                             final boolean active) throws Exception
    {
       this.nodeID = nodeID;
 
@@ -82,6 +84,10 @@
       this.groupAddress = groupAddress;
 
       this.groupPort = groupPort;
+      
+      this.active = active;
+      
+      log.info("created broadcast group active "+ active);
    }
 
    public synchronized void start() throws Exception
@@ -145,9 +151,19 @@
    {
       return connectorPairs.size();
    }
+   
+   public synchronized void activate()
+   {
+      active = true;
+   }
 
    public synchronized void broadcastConnectors() throws Exception
    {
+      if (!active)
+      {
+         return;
+      }
+      
       MessagingBuffer buff = ChannelBuffers.dynamicBuffer(4096);
      
       buff.writeString(nodeID);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.server.cluster.impl;
 
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CLOSED;
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
 import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
 
 import java.util.HashMap;
@@ -48,7 +50,10 @@
 import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
@@ -109,13 +114,15 @@
    private final int maxHops;
 
    private final UUID nodeUUID;
-   
+
    private final Channel replicatingChannel;
    
+   private final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
+
    private boolean backup;
 
    private volatile boolean started;
-
+   
    /*
     * Constructor using static list of connectors
     */
@@ -172,10 +179,12 @@
       this.maxHops = maxHops;
 
       this.nodeUUID = nodeUUID;
-      
+
       this.replicatingChannel = replicatingChannel;
+
+      this.backup = backup;
       
-      this.backup = backup;
+      this.staticConnectors = connectors;
 
       this.updateConnectors(connectors);
    }
@@ -236,10 +245,12 @@
       this.maxHops = maxHops;
 
       this.nodeUUID = nodeUUID;
-      
+
       this.replicatingChannel = replicatingChannel;
+
+      this.backup = backup;
       
-      this.backup = backup;
+      this.staticConnectors = null;
    }
 
    public synchronized void start() throws Exception
@@ -286,21 +297,37 @@
    {
       return name;
    }
-   
-   public synchronized void handleReplicatedUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+
+   public synchronized void activate()
    {
-      if (!backup)
+      backup = false;
+      
+      if (discoveryGroup != null)
       {
-         return;
+         connectorsChanged();
       }
-      
-      updateConnectors(connectors);
+      else
+      {
+         try
+         {
+            updateConnectors(staticConnectors);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to update connectors", e);
+         }
+      }
    }
    
    // DiscoveryListener implementation ------------------------------------------------------------------
 
    public synchronized void connectorsChanged()
    {
+      if (backup)
+      {
+         return;
+      }
+      
       try
       {
          List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
@@ -312,34 +339,10 @@
          log.error("Failed to update connectors", e);
       }
    }
-   
+
    private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
    {
-      if (replicatingChannel == null)
-      {
-         doUpdateConnectors(connectors);
-      }
-      else
-      {
-         Packet packet = new ReplicateClusterConnectionUpdate(name, connectors);
-         
-         Runnable action = new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  doUpdateConnectors(connectors);
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to update connectors", e);
-               }
-            }
-         };
-         
-         replicatingChannel.replicatePacket(packet, 1, action);
-      }
+      doUpdateConnectors(connectors);
    }
 
    private void doUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
@@ -387,7 +390,9 @@
                // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
                // actually routed to at that address though
 
-               Binding storeBinding = new LocalQueueBinding(queue.getName(), queue, new SimpleString(nodeUUID.toString()));
+               Binding storeBinding = new LocalQueueBinding(queue.getName(),
+                                                            queue,
+                                                            new SimpleString(nodeUUID.toString()));
 
                storageManager.addQueueBinding(storeBinding);
             }
@@ -411,7 +416,9 @@
                                            managementService.getManagementAddress(),
                                            managementService.getManagementNotificationAddress(),
                                            managementService.getClusterPassword(),
-                                           record);
+                                           record,
+                                           replicatingChannel,
+                                           !backup);
 
             record.setBridge(bridge);
 
@@ -513,6 +520,7 @@
          clearBindings();
       }
 
+      
       public void onMessage(final ClientMessage message)
       {
          try
@@ -538,177 +546,339 @@
 
             NotificationType ntype = NotificationType.valueOf(type.toString());
 
-            Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
-
-            if (distance == null)
-            {
-               throw new IllegalStateException("distance is null");
-            }
-
             switch (ntype.toInt())
             {
                case NotificationType.BINDING_ADDED_INDEX:
                {
+                  doBindingAdded(message, replicatingChannel);
 
-                  SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
+                  break;
+               }
+               case NotificationType.BINDING_REMOVED_INDEX:
+               {
+                  doBindingRemoved(message, replicatingChannel);
 
-                  if (queueAddress == null)
-                  {
-                     throw new IllegalStateException("queueAddress is null");
-                  }
+                  break;
+               }
+               case NotificationType.CONSUMER_CREATED_INDEX:
+               {
+                  doConsumerCreated(message, replicatingChannel);
 
-                  SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+                  break;
+               }
+               case NotificationType.CONSUMER_CLOSED_INDEX:
+               {
+                  doConsumerClosed(message, replicatingChannel);
 
-                  if (clusterName == null)
-                  {
-                     throw new IllegalStateException("clusterName is null");
-                  }
+                  break;
+               }
+               default:
+               {
+                  throw new IllegalArgumentException("Invalid type " + ntype.toInt());
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to handle message", e);
+         }
+      }
 
-                  SimpleString routingName = (SimpleString)message.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+      private void clearBindings() throws Exception
+      {
+         for (RemoteQueueBinding binding : bindings.values())
+         {
+            postOffice.removeBinding(binding.getUniqueName());
+         }
 
-                  if (routingName == null)
-                  {
-                     throw new IllegalStateException("routingName is null");
-                  }
+         bindings.clear();
+      }
+      
+      private void doBindingAdded(final ClientMessage message, final Channel replChannel) throws Exception
+      {
+         Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
 
-                  SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+         if (distance == null)
+         {
+            throw new IllegalStateException("distance is null");
+         }
 
-                  Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
+         SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
 
-                  if (queueID == null)
-                  {
-                     throw new IllegalStateException("queueID is null");
-                  }
+         if (queueAddress == null)
+         {
+            throw new IllegalStateException("queueAddress is null");
+         }
 
-                  RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
-                                                                          clusterName,
-                                                                          routingName,
-                                                                          queueID,
-                                                                          filterString,
-                                                                          queue,
-                                                                          useDuplicateDetection,
-                                                                          bridge.getName(),
-                                                                          distance + 1);
+         SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
-                  bindings.put(clusterName, binding);
+         if (clusterName == null)
+         {
+            throw new IllegalStateException("clusterName is null");
+         }
 
-                  if (postOffice.getBinding(clusterName) != null)
-                  {
-                     // Sanity check - this means the binding has already been added via another bridge, probably max
-                     // hops is too high
-                     // or there are multiple cluster connections for the same address
+         SimpleString routingName = (SimpleString)message.getProperty(ManagementHelper.HDR_ROUTING_NAME);
 
-                     log.warn("Remoting queue binding " + clusterName +
-                              " has already been bound in the post office. Most likely cause for this is you have a loop " +
-                              "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+         if (routingName == null)
+         {
+            throw new IllegalStateException("routingName is null");
+         }
 
-                     return;
-                  }
+         SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
-                  postOffice.addBinding(binding);
+         Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
 
-                  Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
-
-                  theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
-
-                  break;
-               }
-               case NotificationType.BINDING_REMOVED_INDEX:
+         if (queueID == null)
+         {
+            throw new IllegalStateException("queueID is null");
+         }
+         
+         if (replChannel != null)
+         {
+            Packet packet = new ReplicateRemoteBindingAddedMessage(name, queueAddress, clusterName, routingName, queueID, filterString, queue.getName(), distance + 1);
+            
+            replChannel.replicatePacket(packet, 2, new Runnable()
+            {
+               public void run()
                {
-                  SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
-                  if (clusterName == null)
+                  try
                   {
-                     throw new IllegalStateException("clusterName is null");
+                     doBindingAdded(message, null);
                   }
-
-                  RemoteQueueBinding binding = bindings.remove(clusterName);
-
-                  if (binding == null)
+                  catch (Exception e)
                   {
-                     throw new IllegalStateException("Cannot find binding for queue " + clusterName);
+                     log.error("Failed to add remote queue binding", e);
                   }
+               }
+            });
+         }
+         else
+         {   
+            RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+                                                                    clusterName,
+                                                                    routingName,
+                                                                    queueID,
+                                                                    filterString,
+                                                                    queue,
+                                                                    useDuplicateDetection,
+                                                                    bridge.getName(),
+                                                                    distance + 1);
+   
+            bindings.put(clusterName, binding);
+   
+            if (postOffice.getBinding(clusterName) != null)
+            {
+               // Sanity check - this means the binding has already been added via another bridge, probably max
+               // hops is too high
+               // or there are multiple cluster connections for the same address
+   
+               log.warn("Remoting queue binding " + clusterName +
+                        " has already been bound in the post office. Most likely cause for this is you have a loop " +
+                        "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+   
+               return;
+            }
+   
+            postOffice.addBinding(binding);
+   
+            Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
+   
+            theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
+         }
+      }
 
-                  postOffice.removeBinding(binding.getUniqueName());
+      private void doBindingRemoved(final ClientMessage message, final Channel replChannel) throws Exception
+      {
+         SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
-                  break;
-               }
-               case NotificationType.CONSUMER_CREATED_INDEX:
+         if (clusterName == null)
+         {
+            throw new IllegalStateException("clusterName is null");
+         }
+                  
+         if (replChannel != null)
+         {
+            Packet packet = new ReplicateRemoteBindingRemovedMessage(clusterName);
+            
+            replChannel.replicatePacket(packet, 2, new Runnable()
+            {
+               public void run()
                {
-                  SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
-                  if (clusterName == null)
+                  try
                   {
-                     throw new IllegalStateException("clusterName is null");
+                     doBindingRemoved(message, null);
                   }
-
-                  SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-
-                  RemoteQueueBinding binding = bindings.get(clusterName);
-
-                  if (binding == null)
+                  catch (Exception e)
                   {
-                     throw new IllegalStateException("Cannot find binding for " + clusterName);
+                     log.error("Failed to remove remote queue binding", e);
                   }
+               }
+            });
+         }
+         else
+         {            
+            RemoteQueueBinding binding = bindings.remove(clusterName);
+   
+            if (binding == null)
+            {
+               throw new IllegalStateException("Cannot find binding for queue " + clusterName);
+            }
+   
+            postOffice.removeBinding(binding.getUniqueName());
+         }
+      }
 
-                  binding.addConsumer(filterString);
+      private void doConsumerCreated(final ClientMessage message, final Channel replChannel) throws Exception
+      {
+         Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
 
-                  message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+         if (distance == null)
+         {
+            throw new IllegalStateException("distance is null");
+         }
 
-                  // Need to propagate the consumer add
-                  Notification notification = new Notification(ntype, message.getProperties());
+         SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
-                  managementService.sendNotification(notification);
+         if (clusterName == null)
+         {
+            throw new IllegalStateException("clusterName is null");
+         }
 
-                  break;
-               }
-               case NotificationType.CONSUMER_CLOSED_INDEX:
+         SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+         
+         if (replChannel != null)
+         {
+            Packet packet = new ReplicateRemoteConsumerAddedMessage(clusterName, filterString);
+            
+            replChannel.replicatePacket(packet, 2, new Runnable()
+            {
+               public void run()
                {
-                  SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
-                  if (clusterName == null)
+                  try
                   {
-                     throw new IllegalStateException("clusterName is null");
+                     doConsumerCreated(message, null);
                   }
-
-                  SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-
-                  RemoteQueueBinding binding = bindings.get(clusterName);
-
-                  if (binding == null)
+                  catch (Exception e)
                   {
-                     throw new IllegalStateException("Cannot find binding for " + clusterName);
+                     log.error("Failed to add remote consumer", e);
                   }
+               }
+            });
+         }
+         else
+         {           
+            RemoteQueueBinding binding = bindings.get(clusterName);
+   
+            if (binding == null)
+            {
+               throw new IllegalStateException("Cannot find binding for " + clusterName);
+            }
+   
+            binding.addConsumer(filterString);
+   
+            message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+   
+            // Need to propagate the consumer add
+            Notification notification = new Notification(CONSUMER_CREATED, message.getProperties());
+   
+            managementService.sendNotification(notification);
+         }
+      }
 
-                  binding.removeConsumer(filterString);
+      private void doConsumerClosed(final ClientMessage message, final Channel replChannel) throws Exception
+      {
+         Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
 
-                  message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+         if (distance == null)
+         {
+            throw new IllegalStateException("distance is null");
+         }
 
-                  // Need to propagate the consumer close
-                  Notification notification = new Notification(ntype, message.getProperties());
+         SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
-                  managementService.sendNotification(notification);
+         if (clusterName == null)
+         {
+            throw new IllegalStateException("clusterName is null");
+         }
 
-                  break;
+         SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+         
+         if (replChannel != null)
+         {
+            Packet packet = new ReplicateRemoteConsumerRemovedMessage(clusterName, filterString);
+            
+            replChannel.replicatePacket(packet, 2, new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     doConsumerClosed(message, null);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to remove remote consumer", e);
+                  }
                }
+            });
+         }
+         else
+         {           
+            RemoteQueueBinding binding = bindings.get(clusterName);
+   
+            if (binding == null)
+            {
+               throw new IllegalStateException("Cannot find binding for " + clusterName);
             }
+   
+            binding.removeConsumer(filterString);
+   
+            message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+   
+            // Need to propagate the consumer close
+            Notification notification = new Notification(CONSUMER_CLOSED, message.getProperties());
+   
+            managementService.sendNotification(notification);
          }
-         catch (Exception e)
-         {
-            log.error("Failed to handle message", e);
-         }
       }
 
-      private void clearBindings() throws Exception
+
+   }
+   
+   
+
+   public void handleReplicatedAddBinding(final SimpleString address,
+                                          final SimpleString uniqueName,
+                                          final SimpleString routingName,
+                                          final int queueID,
+                                          final SimpleString filterString,
+                                          final SimpleString queueName,
+                                          final int distance) throws Exception
+   {
+      Binding queueBinding = postOffice.getBinding(queueName);
+
+      if (queueBinding == null)
       {
-         for (RemoteQueueBinding binding : bindings.values())
-         {
-            postOffice.removeBinding(binding.getUniqueName());
-         }
-
-         bindings.clear();
+         throw new IllegalStateException("Cannot find s & f queue " + queueName);
       }
 
+      Queue queue = (Queue)queueBinding.getBindable();
+
+      RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+                                                              uniqueName,
+                                                              routingName,
+                                                              queueID,
+                                                              filterString,
+                                                              queue,
+                                                              useDuplicateDetection,
+                                                              queueName,
+                                                              distance);
+
+      postOffice.addBinding(binding);
+
+      Bindings theBindings = postOffice.getBindingsForAddress(address);
+
+      theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
    }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -47,6 +47,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
@@ -94,8 +95,12 @@
    private final QueueFactory queueFactory;
 
    private final UUID nodeUUID;
+   
+   private Channel replicatingChannel;
 
    private volatile boolean started;
+   
+   private boolean backup;
 
    public ClusterManagerImpl(final org.jboss.messaging.utils.ExecutorFactory executorFactory,
                              final StorageManager storageManager,
@@ -104,7 +109,9 @@
                              final ManagementService managementService,
                              final Configuration configuration,
                              final QueueFactory queueFactory,
-                             final UUID nodeUUID)
+                             final UUID nodeUUID,
+                             final Channel replicatingChannel,
+                             final boolean backup)
    {
       this.executorFactory = executorFactory;
 
@@ -121,6 +128,10 @@
       this.queueFactory = queueFactory;
 
       this.nodeUUID = nodeUUID;
+      
+      this.replicatingChannel = replicatingChannel;
+      
+      this.backup = backup;
    }
 
    public synchronized void start() throws Exception
@@ -212,6 +223,28 @@
    {
       return clusters.get(name.toString()); 
    }
+   
+   public synchronized void activate()
+   {
+      for (BroadcastGroup bg: broadcastGroups.values())
+      {
+         bg.activate();
+      }
+      
+      for (Bridge bridge: bridges.values())
+      {
+         bridge.activate();
+      }
+      
+      for (ClusterConnection cc: clusters.values())
+      {
+         cc.activate();
+      }
+      
+      replicatingChannel = null;
+      
+      backup = false;
+   }
 
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
    {
@@ -229,7 +262,8 @@
                                                         config.getName(),
                                                         config.getLocalBindPort(),
                                                         groupAddress,
-                                                        config.getGroupPort());
+                                                        config.getGroupPort(),
+                                                        !backup);
 
       for (Pair<String, String> connectorInfo : config.getConnectorInfos())
       {
@@ -383,6 +417,8 @@
          Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
                                                                                                                               backupConnector);
 
+         log.info("deploying bridge, backup is " + backup);
+         
          bridge = new BridgeImpl(nodeUUID,
                                  new SimpleString(config.getName()),
                                  queue,
@@ -399,7 +435,9 @@
                                  config.isUseDuplicateDetection(),
                                  managementService.getManagementAddress(),
                                  managementService.getManagementNotificationAddress(),
-                                 managementService.getClusterPassword());
+                                 managementService.getClusterPassword(),
+                                 replicatingChannel,
+                                 !backup);
 
          bridges.put(config.getName(), bridge);
 
@@ -481,8 +519,8 @@
                                                        connectors,
                                                        config.getMaxHops(),
                                                        nodeUUID,
-                                                       null,
-                                                       false);
+                                                       replicatingChannel,
+                                                       backup);
       }
       else
       {
@@ -511,8 +549,8 @@
                                                        dg,
                                                        config.getMaxHops(),
                                                        nodeUUID,
-                                                       null,
-                                                       false);
+                                                       replicatingChannel,
+                                                       backup);
       }
 
       managementService.registerCluster(clusterConnection, config);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -380,7 +380,9 @@
                                                  managementService,
                                                  configuration,
                                                  queueFactory,
-                                                 uuid);
+                                                 uuid,
+                                                 getReplicatingChannel(),
+                                                 configuration.isBackup());
 
          clusterManager.start();
       }
@@ -442,7 +444,6 @@
       postOffice = null;
       securityRepository = null;
       securityStore = null;
-      addressSettingsRepository.clear();
       scheduledExecutor.shutdown();
       queueFactory = null;
       resourceManager = null;
@@ -581,6 +582,11 @@
          configuration.setBackup(false);
 
          remotingService.setBackup(false);
+         
+         if (clusterManager != null)
+         {
+            clusterManager.activate();
+         }
       }
 
       connection.activate();
@@ -645,34 +651,34 @@
       }
    }
 
-   public CreateSessionResponseMessage replicateCreateSession(final String name,
-                                                              final long replicatedChannelID,
-                                                              final long originalChannelID,
-                                                              final String username,
-                                                              final String password,
-                                                              final int minLargeMessageSize,
-                                                              final int incrementingVersion,
-                                                              final RemotingConnection connection,
-                                                              final boolean autoCommitSends,
-                                                              final boolean autoCommitAcks,
-                                                              final boolean preAcknowledge,
-                                                              final boolean xa,
-                                                              final int sendWindowSize) throws Exception
+   public void replicateCreateSession(final String name,
+                                      final long replicatedChannelID,
+                                      final long originalChannelID,
+                                      final String username,
+                                      final String password,
+                                      final int minLargeMessageSize,
+                                      final int incrementingVersion,
+                                      final RemotingConnection connection,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preAcknowledge,
+                                      final boolean xa,
+                                      final int sendWindowSize) throws Exception
    {
-      return doCreateSession(name,
-                             replicatedChannelID,
-                             originalChannelID,
-                             username,
-                             password,
-                             minLargeMessageSize,
-                             incrementingVersion,
-                             connection,
-                             autoCommitSends,
-                             autoCommitAcks,
-                             preAcknowledge,
-                             xa,
-                             sendWindowSize,
-                             true);
+      doCreateSession(name,
+                      replicatedChannelID,
+                      originalChannelID,
+                      username,
+                      password,
+                      minLargeMessageSize,
+                      incrementingVersion,
+                      connection,
+                      autoCommitSends,
+                      autoCommitAcks,
+                      preAcknowledge,
+                      xa,
+                      sendWindowSize,
+                      true);
    }
 
    public CreateSessionResponseMessage createSession(final String name,
@@ -717,19 +723,6 @@
       return sessions.get(name);
    }
 
-   public void updateClusterConnectionConnectors(final SimpleString clusterConnectionName,
-                                                 final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
-   {
-      ClusterConnection cc = clusterManager.getClusterConnection(clusterConnectionName);
-
-      if (cc == null)
-      {
-         throw new IllegalStateException("Cannot find cluster connection with name " + clusterConnectionName);
-      }
-
-      cc.handleReplicatedUpdateConnectors(connectors);
-   }
-
    public List<ServerSession> getSessions(final String connectionID)
    {
       Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
@@ -1099,7 +1092,14 @@
 
          if (conn != null)
          {
-            conn.fail(me);
+            // Execute on different thread to avoid deadlocks
+            new Thread()
+            {
+               public void run()
+               {
+                  conn.fail(me);
+               }
+            }.start();
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -13,22 +13,33 @@
 package org.jboss.messaging.core.server.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_UPDATE_CONNECTORS;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
 
 /**
  * A packet handler for all packets that need to be handled at the server level
@@ -90,22 +101,63 @@
 
             break;
          }
-         case REPLICATE_UPDATE_CONNECTORS:
+         case CREATE_QUEUE:
          {
-            ReplicateClusterConnectionUpdate request = (ReplicateClusterConnectionUpdate)packet;
-            
-            handleClusterConnectionUpdate(request);
-            
+            // Create queue can also be fielded here in the case of a replicated store and forward queue creation
+
+            CreateQueueMessage request = (CreateQueueMessage)packet;
+
+            handleCreateQueue(request);
+
             break;
-            
          }
+         case PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING:
+         {
+            ReplicateRemoteBindingAddedMessage request = (ReplicateRemoteBindingAddedMessage)packet;
+
+            handleAddRemoteQueueBinding(request);
+
+            break;
+         }
+         case PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
+         {
+            ReplicateRemoteBindingRemovedMessage request = (ReplicateRemoteBindingRemovedMessage)packet;
+
+            handleRemoveRemoteQueueBinding(request);
+
+            break;
+         }
+         case PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER:
+         {
+            ReplicateRemoteConsumerAddedMessage request = (ReplicateRemoteConsumerAddedMessage)packet;
+
+            handleAddRemoteConsumer(request);
+
+            break;
+         }
+         case PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER:
+         {
+            ReplicateRemoteConsumerRemovedMessage request = (ReplicateRemoteConsumerRemovedMessage)packet;
+
+            handleRemoveRemoteConsumer(request);
+
+            break;
+         }
+         case PacketImpl.REPLICATE_ACKNOWLEDGE:
+         {
+            ReplicateAcknowledgeMessage request = (ReplicateAcknowledgeMessage)packet;
+
+            handleReplicateAcknowledge(request);
+
+            break;
+         }
          default:
          {
             log.error("Invalid packet " + packet);
          }
       }
    }
-   
+
    private void doHandleCreateSession(final CreateSessionMessage request, final long oppositeChannelID)
    {
       Packet response;
@@ -138,7 +190,7 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
-      
+
       channel1.send(response);
    }
 
@@ -179,41 +231,28 @@
 
    private void handleReplicateCreateSession(final ReplicateCreateSessionMessage request)
    {
-      Packet response;
-
       try
       {
-         response = server.replicateCreateSession(request.getName(),
-                                                  request.getReplicatedSessionChannelID(),
-                                                  request.getOriginalSessionChannelID(),
-                                                  request.getUsername(),
-                                                  request.getPassword(),
-                                                  request.getMinLargeMessageSize(),
-                                                  request.getVersion(),
-                                                  connection,
-                                                  request.isAutoCommitSends(),
-                                                  request.isAutoCommitAcks(),
-                                                  request.isPreAcknowledge(),
-                                                  request.isXA(),
-                                                  request.getWindowSize());
+         server.replicateCreateSession(request.getName(),
+                                       request.getReplicatedSessionChannelID(),
+                                       request.getOriginalSessionChannelID(),
+                                       request.getUsername(),
+                                       request.getPassword(),
+                                       request.getMinLargeMessageSize(),
+                                       request.getVersion(),
+                                       connection,
+                                       request.isAutoCommitSends(),
+                                       request.isAutoCommitAcks(),
+                                       request.isPreAcknowledge(),
+                                       request.isXA(),
+                                       request.getWindowSize());
       }
       catch (Exception e)
       {
          log.error("Failed to handle replicate create session", e);
-
-         if (e instanceof MessagingException)
-         {
-            response = new MessagingExceptionMessage((MessagingException)e);
-         }
-         else
-         {
-            response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
-         }
       }
-      
-      channel1.send(response);
    }
-   
+
    private void handleReattachSession(final ReattachSessionMessage request)
    {
       Packet response;
@@ -235,15 +274,15 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
-      
+
       channel1.send(response);
    }
-   
-   private void handleClusterConnectionUpdate(final ReplicateClusterConnectionUpdate request)
+
+   private void handleCreateQueue(final CreateQueueMessage request)
    {
       try
       {
-         server.updateClusterConnectionConnectors(request.getClusterConnectionName(), request.getConnectors());
+         server.getServerManagement().createQueue(request.getAddress().toString(), request.getQueueName().toString());
       }
       catch (Exception e)
       {
@@ -251,4 +290,108 @@
       }
    }
 
+   private void handleAddRemoteQueueBinding(final ReplicateRemoteBindingAddedMessage request)
+   {
+      ClusterConnection cc = server.getClusterManager().getClusterConnection(request.getClusterConnectionName());
+
+      if (cc == null)
+      {
+         throw new IllegalStateException("No cluster connection found with name " + request.getClusterConnectionName());
+      }
+
+      try
+      {
+         cc.handleReplicatedAddBinding(request.getAddress(),
+                                       request.getUniqueName(),
+                                       request.getRoutingName(),
+                                       request.getRemoteQueueID(),
+                                       request.getFilterString(),
+                                       request.getSfQueueName(),
+                                       request.getDistance());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle add remote queue binding", e);
+      }
+   }
+
+   private void handleRemoveRemoteQueueBinding(final ReplicateRemoteBindingRemovedMessage request)
+   {
+      try
+      {
+         Binding binding = server.getPostOffice().removeBinding(request.getUniqueName());
+
+         if (binding == null)
+         {
+            throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueName());
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle remove remote queue binding", e);
+      }
+   }
+
+   private void handleAddRemoteConsumer(final ReplicateRemoteConsumerAddedMessage request)
+   {
+      RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
+                                                             .getBinding(request.getUniqueBindingName());
+
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
+      }
+
+      try
+      {
+         binding.addConsumer(request.getFilterString());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle add remote consumer", e);
+      }
+   }
+
+   private void handleRemoveRemoteConsumer(final ReplicateRemoteConsumerRemovedMessage request)
+   {
+      RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
+                                                             .getBinding(request.getUniqueBindingName());
+
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
+      }
+
+      try
+      {
+         binding.removeConsumer(request.getFilterString());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle remove remote consumer", e);
+      }
+   }
+
+   private void handleReplicateAcknowledge(final ReplicateAcknowledgeMessage request)
+   {
+      Binding binding = server.getPostOffice().getBinding(request.getUniqueName());
+
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find binding " + request.getUniqueName());
+      }
+
+      try
+      {
+         Queue queue = (Queue)binding.getBindable();
+         
+         MessageReference ref = queue.removeFirstReference(request.getMessageID());
+         
+         queue.acknowledge(ref);
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle remove remote consumer", e);
+      }
+   }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -43,7 +43,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -371,7 +371,7 @@
       }
    }
 
-   public void handleCreateQueue(final SessionCreateQueueMessage packet)
+   public void handleCreateQueue(final CreateQueueMessage packet)
    {
       if (replicatingChannel == null)
       {
@@ -1179,7 +1179,7 @@
       boolean browseOnly = packet.isBrowseOnly();
 
       Packet response = null;
-
+      
       try
       {
          Binding binding = postOffice.getBinding(name);
@@ -1284,7 +1284,7 @@
 
    }
 
-   private void doHandleCreateQueue(final SessionCreateQueueMessage packet)
+   private void doHandleCreateQueue(final CreateQueueMessage packet)
    {
       SimpleString address = packet.getAddress();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -18,8 +18,8 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -55,7 +55,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -120,13 +120,13 @@
                session.handleCreateConsumer(request);
                break;
             }
-            case SESS_CREATE_QUEUE:
+            case CREATE_QUEUE:
             {
-               SessionCreateQueueMessage request = (SessionCreateQueueMessage)packet;
+               CreateQueueMessage request = (CreateQueueMessage)packet;
                session.handleCreateQueue(request);             
                break;
             }
-            case SESS_DELETE_QUEUE:
+            case DELETE_QUEUE:
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
                session.handleDeleteQueue(request);             

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -166,8 +166,8 @@
                                                            configuration);
 
       this.useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
-                                                           TransportConstants.DEFAULT_USE_INVM,
-                                                           configuration);
+                                                            TransportConstants.DEFAULT_USE_INVM,
+                                                            configuration);
       this.host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
                                                         TransportConstants.DEFAULT_HOST,
                                                         configuration);
@@ -218,7 +218,7 @@
       bossExecutor = Executors.newCachedThreadPool(new org.jboss.messaging.utils.JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
       workerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-worker-threads"));
 
-      if(useInvm)
+      if (useInvm)
       {
          channelFactory = new DefaultLocalServerChannelFactory();
       }
@@ -296,7 +296,7 @@
       for (String h : hosts)
       {
          SocketAddress address;
-         if(useInvm)
+         if (useInvm)
          {
             address = new LocalAddress(h);
             System.out.println("address = " + address);
@@ -312,7 +312,7 @@
 
    public synchronized void stop()
    {
-      
+
       if (channelFactory == null)
       {
          return;
@@ -389,7 +389,7 @@
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
-         
+
          listener.connectionCreated(connection);
       }
 
@@ -397,12 +397,12 @@
       {
          if (connections.remove(connectionID) != null)
          {
-            //Execute on different thread to avoid deadlocks
+            // Execute on different thread to avoid deadlocks
             new Thread()
             {
                public void run()
                {
-                  listener.connectionDestroyed(connectionID);               
+                  listener.connectionDestroyed(connectionID);
                }
             }.start();
          }
@@ -410,7 +410,15 @@
 
       public void connectionException(final Object connectionID, final MessagingException me)
       {
-         listener.connectionException(connectionID, me);
+         // Execute on different thread to avoid deadlocks
+         new Thread()
+         {
+            public void run()
+            {
+               listener.connectionException(connectionID, me);
+            }
+         }.start();
+
       }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -582,7 +582,14 @@
 
       public void connectionException(final Object connectionID, final MessagingException me)
       {
-         listener.connectionException(connectionID, me);
+         // Execute on different thread to avoid deadlocks
+         new Thread()
+         {
+            public void run()
+            {
+               listener.connectionException(connectionID, me);
+            }
+         }.start();         
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/utils/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/TypedProperties.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/utils/TypedProperties.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -336,7 +336,7 @@
          properties.clear();
       }
    }
-
+   
    // Private ------------------------------------------------------------------------------------
 
    private void checkCreateProperties()

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -157,7 +157,7 @@
             }
          }
 
-         //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+        // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
          {
@@ -165,7 +165,7 @@
             return;
          }
 
-         Thread.sleep(10);
+         Thread.sleep(100);
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
 
@@ -396,7 +396,7 @@
 
             if (message != null)
             {
-               log.info("Consumer " + consumerIDs[i] + " received message " + message.getProperty(COUNT_PROP));
+               //log.info("Consumer " + consumerIDs[i] + " received message " + message.getProperty(COUNT_PROP));
             }
          }
          while (message != null);
@@ -515,13 +515,13 @@
          ClientMessage message;
          do
          {
-            message = holder.consumer.receive(200);
+            message = holder.consumer.receive(1000);
 
             if (message != null)
             {
                int count = (Integer)message.getProperty(COUNT_PROP);
 
-               //log.info("consumer " + consumerIDs[i] + " received message " + count);
+              // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
                assertFalse(counts.contains(count));
 
@@ -1057,7 +1057,7 @@
    }
 
    protected void stopServers(int... nodes) throws Exception
-   {
+   {      
       for (int i = 0; i < nodes.length; i++)
       {
          if (services[nodes[i]].isStarted())

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -69,7 +69,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -114,7 +114,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -181,7 +181,7 @@
       
       String nodeID = randomString();
 
-      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -263,7 +263,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -299,7 +299,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -343,13 +343,13 @@
 
       final int timeout = 500;
 
-      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress1, groupPort1);
+      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress1, groupPort1, true);
       bg1.start();
 
-      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress2, groupPort2);
+      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress2, groupPort2, true);
       bg2.start();
 
-      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress3, groupPort3);
+      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress3, groupPort3, true);
       bg3.start();
 
       TransportConfiguration live1 = generateTC();
@@ -428,7 +428,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -471,7 +471,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 
@@ -583,13 +583,13 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
       bg1.start();
 
-      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
       bg2.start();
 
-      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
       bg3.start();
 
       TransportConfiguration live1 = generateTC();
@@ -823,7 +823,7 @@
       final int groupPort = 6745;
       final int timeout = 500;
 
-      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+      BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
 
       bg.start();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -61,7 +61,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -107,7 +107,7 @@
       boolean durable = randomBoolean();
       boolean temporary = randomBoolean();
 
-      SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, null, durable, false);
+      CreateQueueMessage request = new CreateQueueMessage(address, queueName, null, durable, false);
 
       // SimpleString version
       expect(channel.sendBlocking(request)).andReturn(new NullResponseMessage());
@@ -128,7 +128,7 @@
       verifyMocks();
 
       // with temporary
-      request = new SessionCreateQueueMessage(address, queueName, null, durable, temporary);
+      request = new CreateQueueMessage(address, queueName, null, durable, temporary);
 
       resetMocks();
 
@@ -142,7 +142,7 @@
       // full methods
       resetMocks();
 
-      request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temporary);
+      request = new CreateQueueMessage(address, queueName, filterString, durable, temporary);
 
       expect(channel.sendBlocking(request)).andReturn(new NullResponseMessage());
       replayMocks();
@@ -154,7 +154,7 @@
       // full methods with String
       resetMocks();
 
-      request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temporary);
+      request = new CreateQueueMessage(address, queueName, filterString, durable, temporary);
 
       expect(channel.sendBlocking(request)).andReturn(new NullResponseMessage());
       replayMocks();
@@ -596,7 +596,7 @@
    // CommandManager cm = EasyMock.createStrictMock(CommandManager.class);
    // ConnectionRegistry reg = EasyMock.createStrictMock(ConnectionRegistry.class);
    //
-   // SessionCreateQueueMessage request = new SessionCreateQueueMessage(new SimpleString("blah"), new
+   // CreateQueueMessage request = new CreateQueueMessage(new SimpleString("blah"), new
    // SimpleString("hagshg"),
    // new SimpleString("jhjhs"), false, false);
    //

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java	2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java	2009-03-09 14:53:41 UTC (rev 6049)
@@ -76,7 +76,7 @@
 //
 //   public void testCreateQueue() throws Exception
 //   {
-//      SessionCreateQueueMessage request = new SessionCreateQueueMessage(queueName, queueName, filterString, true, true);
+//      CreateQueueMessage request = new CreateQueueMessage(queueName, queueName, filterString, true, true);
 //      session.createQueue(queueName, queueName, filterString, true, true);
 //      long responseTargetID = 1212;
 //      request.setResponseTargetID(responseTargetID);




More information about the jboss-cvs-commits mailing list