[jboss-cvs] JBoss Messaging SVN: r5686 - in trunk: src/main/org/jboss/messaging/core/persistence and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 22 06:16:47 EST 2009


Author: timfox
Date: 2009-01-22 06:16:47 -0500 (Thu, 22 Jan 2009)
New Revision: 5686

Added:
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
   trunk/src/main/org/jboss/messaging/core/server/Bindable.java
   trunk/src/main/org/jboss/messaging/core/server/Divert.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/schemas/jbm-configuration.xsd
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
   trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java
Log:
more clustering, tweaks, fixes etc

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -87,7 +87,7 @@
          for (int i = 0; i < bindings.getBindings().size(); i++)
          {
             Binding binding = bindings.getBindings().get(i);
-            queueNames[i] = binding.getBindable().getUniqueName().toString();
+            queueNames[i] = binding.getUniqueName().toString();
          }
          return queueNames;
       }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -54,10 +54,10 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
@@ -297,7 +297,7 @@
       if (postOffice.getBinding(sName) == null)
       {
          Queue queue = queueFactory.createQueue(-1, sName, null, true, false);
-         QueueBinding binding = new QueueBindingImpl(sAddress, queue);
+         Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
          storageManager.addQueueBinding(binding);
          postOffice.addBinding(binding);
       }
@@ -316,7 +316,7 @@
       if (postOffice.getBinding(sName) == null)
       {
          Queue queue = queueFactory.createQueue(-1, sName, filter, durable, false);
-         QueueBinding binding = new QueueBindingImpl(sAddress, queue);
+         Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
          if (durable)
          {
             storageManager.addQueueBinding(binding);

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -28,8 +28,8 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.core.server.LargeServerMessage;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingComponent;
@@ -110,7 +110,7 @@
 
    // Bindings related operations
 
-   void addQueueBinding(QueueBinding binding) throws Exception;
+   void addQueueBinding(Binding binding) throws Exception;
    
    void deleteQueueBinding(long queueBindingID) throws Exception;
    

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -60,8 +60,8 @@
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.QueueBindingInfo;
 import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.XidCodecSupport;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -863,20 +863,20 @@
 
    // Bindings operations
 
-   public void addQueueBinding(final QueueBinding binding) throws Exception
+   public void addQueueBinding(final Binding binding) throws Exception
    {
-      Filter filter = binding.getQueue().getFilter();
+      Queue queue = (Queue)binding.getBindable();
+      Filter filter = queue.getFilter();
 
       SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(binding.getBindable()
-                                                                                                 .getUniqueName(),
+      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
                                                                                           binding.getAddress(),
                                                                                           filterString);
 
       long id = this.generateUniqueID();
 
-      binding.getQueue().setPersistenceID(id);
+      queue.setPersistenceID(id);
 
       bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding);
    }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -30,8 +30,8 @@
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.persistence.QueueBindingInfo;
 import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.core.server.LargeServerMessage;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
@@ -58,7 +58,7 @@
 
    private volatile boolean started;
 
-   public void addQueueBinding(final QueueBinding queueBinding) throws Exception
+   public void addQueueBinding(final Binding queueBinding) throws Exception
    {
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -18,14 +18,14 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.postoffice;
 
 import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
 
-
 /**
  * 
  * A Binding
@@ -36,8 +36,16 @@
 public interface Binding
 {
    SimpleString getAddress();
-         
-   Bindable getBindable();    
-   
+
+   Bindable getBindable();
+
    boolean isQueueBinding();
+
+   SimpleString getUniqueName();
+
+   SimpleString getRoutingName();
+
+   boolean accept(ServerMessage message) throws Exception;
+
+   boolean isExclusive();
 }

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice;
-
-import org.jboss.messaging.core.server.Divert;
-
-/**
- * A DivertBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 9 Jan 2009 18:28:42
- *
- *
- */
-public interface DivertBinding extends Binding
-{
-   public Divert getDivert();
-}

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice;
-
-import org.jboss.messaging.core.server.Queue;
-
-/**
- * A QueueBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 9 Jan 2009 17:18:44
- *
- *
- */
-public interface QueueBinding extends Binding
-{
-   public Queue getQueue();
-}

Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A BindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 18:52:15
+ *
+ *
+ */
+public class BindingImpl implements Binding
+{
+   private final SimpleString address;
+
+   private final SimpleString uniqueName;
+
+   private final SimpleString routingName;
+
+   protected final Bindable bindable;
+
+   private final boolean exclusive;
+
+   private final boolean isQueue;
+
+   public BindingImpl(final SimpleString address,
+                      final SimpleString uniqueName,
+                      final SimpleString routingName,
+                      final Bindable bindable,                      
+                      final boolean exclusive,
+                      final boolean isQueue)
+   {
+      this.address = address;
+
+      this.uniqueName = uniqueName;
+
+      this.routingName = routingName;
+
+      this.bindable = bindable;
+
+      this.exclusive = exclusive;
+
+      this.isQueue = isQueue;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public Bindable getBindable()
+   {
+      return bindable;
+   }
+
+   public boolean isQueueBinding()
+   {
+      return isQueue;
+   }
+
+   public boolean accept(final ServerMessage message) throws Exception
+   {
+      return bindable.accept(message);
+   }
+
+   public SimpleString getRoutingName()
+   {
+      return routingName;
+   }
+
+   public SimpleString getUniqueName()
+   {
+      return uniqueName;
+   }
+
+   public boolean isExclusive()
+   {
+      return exclusive;
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -67,13 +67,13 @@
 
    public void addBinding(final Binding binding)
    {
-      if (binding.getBindable().isExclusive())
+      if (binding.isExclusive())
       {
          exclusiveBindings.add(binding);
       }
       else
       {
-         SimpleString routingName = binding.getBindable().getRoutingName();
+         SimpleString routingName = binding.getRoutingName();
          
          List<Binding> bindings = routingNameBindingMap.get(routingName);
          
@@ -97,13 +97,13 @@
    
    public void removeBinding(final Binding binding)
    {
-      if (binding.getBindable().isExclusive())
+      if (binding.isExclusive())
       {
          exclusiveBindings.remove(binding);
       }
       else
       {
-         SimpleString routingName = binding.getBindable().getRoutingName();
+         SimpleString routingName = binding.getRoutingName();
          
          List<Binding> bindings = routingNameBindingMap.get(routingName);
          
@@ -189,11 +189,11 @@
                   pos = 0;
                }
                
-               if (binding.getBindable().accept(message))
+               if (binding.accept(message))
                {
                   chosen.add(binding.getBindable());
                   
-                  SimpleString headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(binding.getBindable().getRoutingName());
+                  SimpleString headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(binding.getRoutingName());
                   
                   message.putBooleanProperty(headerName, Boolean.valueOf(true)); 
                   

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,72 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.postoffice.DivertBinding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.Divert;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A DivertBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 9 Jan 2009 15:45:09
- *
- *
- */
-public class DivertBindingImpl implements DivertBinding
-{
-   private SimpleString address;
-   
-   private Divert divert;
-   
-   public DivertBindingImpl(final SimpleString address, final Divert divert)
-   {
-      this.address = address;
-      
-      this.divert = divert;  
-   }
-        
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
-   public Bindable getBindable()
-   {      
-      return divert;
-   }
-   
-   public Divert getDivert()
-   {
-      return divert;
-   }
-
-   public boolean isQueueBinding()
-   {
-      return false;
-   }
-}

Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.impl.FlowBindingFilter;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 18:55:22
+ *
+ *
+ */
+public class FlowBinding extends BindingImpl
+{
+   private final FlowBindingFilter filter;
+
+   public FlowBinding(final SimpleString address,
+                      final SimpleString uniqueName,
+                      final SimpleString routingName,
+                      final Bindable bindable,
+                      final FlowBindingFilter filter)
+   {
+      super(address, uniqueName, routingName, bindable, false, false);
+
+      this.filter = filter;
+   }
+
+   public boolean accept(final ServerMessage message) throws Exception
+   {
+      if (filter.match(message))
+      {
+         return bindable.accept(message);
+      }
+      else
+      {
+         return false;
+      }
+   }
+   
+   public FlowBindingFilter getFilter()
+   {
+      return filter;
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,72 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.postoffice.DivertBinding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.Divert;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A LinkBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 9 Jan 2009 18:29:24
- *
- *
- */
-public class LinkBindingImpl implements DivertBinding
-{
-   private final SimpleString address;
-   
-   private final Divert link;
-   
-   public LinkBindingImpl(final SimpleString address, final Divert link)
-   {
-      this.address = address;
-      
-      this.link = link;      
-   }     
-  
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
-   public Bindable getBindable()
-   {
-      return link;
-   }
-
-   public boolean isQueueBinding()
-   {
-      return false;
-   }
-
-   public Divert getDivert()
-   {      
-      return link;
-   }
-}

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,72 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A QueueBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 9 Jan 2009 15:45:09
- *
- *
- */
-public class QueueBindingImpl implements QueueBinding
-{
-   private SimpleString address;
-   
-   private Queue queue;
-   
-   public QueueBindingImpl(final SimpleString address, final Queue queue)
-   {
-      this.address = address;
-      
-      this.queue = queue;      
-   }
-        
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
-   public Bindable getBindable()
-   {      
-      return queue;
-   }
-   
-   public Queue getQueue()
-   {
-      return queue;
-   }
-
-   public boolean isQueueBinding()
-   {
-      return true;
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -50,7 +50,7 @@
 
    public void addBinding(final Binding binding)
    {
-      if (nameMap.putIfAbsent(binding.getBindable().getUniqueName(), binding) != null)
+      if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
       {
          throw new IllegalStateException("Binding already exists " + binding);
       }
@@ -157,7 +157,7 @@
 
       for (Binding binding: bindings.getBindings())
       {
-         if (binding.getBindable().getUniqueName().equals(bindableName))
+         if (binding.getUniqueName().equals(bindableName))
          {
             theBinding = binding;
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -145,7 +145,7 @@
                {
                   for (Binding b : bindings.getBindings())
                   {
-                     super.removeMapping(address, b.getBindable().getUniqueName());
+                     super.removeMapping(address, b.getUniqueName());
                   }
                }
             }

Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.server;
 
 import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * A Bindable
@@ -38,12 +37,6 @@
 {
    void route(ServerMessage message, Transaction tx) throws Exception;
    
-   SimpleString getUniqueName();
-   
-   SimpleString getRoutingName();
-   
    boolean accept(ServerMessage message) throws Exception;
-   
-   boolean isExclusive();
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/Divert.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Divert.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/Divert.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -24,6 +24,7 @@
 package org.jboss.messaging.core.server;
 
 
+
 /**
  * A Divert
  *
@@ -35,5 +36,5 @@
  */
 public interface Divert extends Bindable
 {
-   boolean isExclusive();
+   //boolean isExclusive();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -433,7 +433,8 @@
             }
             else
             {
-               dest = (SimpleString)message.getProperty(MessageImpl.HDR_ORIGINAL_DESTINATION);
+               //Preserve the original address
+               dest = message.getDestination();
             }
 
             producer.send(dest, message);

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,401 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.FlowBinding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A ClusterImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 14:43:05
+ *
+ *
+ */
+public class ClusterImpl implements DiscoveryListener
+{
+   private static final Logger log = Logger.getLogger(ClusterImpl.class);
+
+   private final ExecutorFactory executorFactory;
+
+   private final StorageManager storageManager;
+
+   private final PostOffice postOffice;
+
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+   private final SimpleString name;
+
+   private final SimpleString address;
+
+   private final BridgeConfiguration bridgeConfig;
+
+   private final boolean useDuplicateDetection;
+
+   private final int maxHops;
+
+   private Map<Pair<TransportConfiguration, TransportConfiguration>, Bridge> bridges = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Bridge>();
+
+   private final DiscoveryGroup discoveryGroup;
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final QueueFactory queueFactory;
+
+   private volatile boolean started;
+
+   /*
+    * Constructor using static list of connectors
+    */
+   public ClusterImpl(final SimpleString name,
+                      final SimpleString address,
+                      final BridgeConfiguration bridgeConfig,
+                      final boolean useDuplicateDetection,
+                      final int maxHops,
+                      final ExecutorFactory executorFactory,
+                      final StorageManager storageManager,
+                      final PostOffice postOffice,
+                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                      final ScheduledExecutorService scheduledExecutor,
+                      final QueueFactory queueFactory,
+                      final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
+      this.name = name;
+
+      this.address = address;
+
+      this.bridgeConfig = bridgeConfig;
+
+      this.useDuplicateDetection = useDuplicateDetection;
+
+      this.maxHops = maxHops;
+
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.queueSettingsRepository = queueSettingsRepository;
+
+      this.discoveryGroup = null;
+
+      this.scheduledExecutor = scheduledExecutor;
+
+      this.queueFactory = queueFactory;
+
+      this.updateConnectors(connectors);
+   }
+
+   /*
+    * Constructor using discovery to get connectors
+    */
+   public ClusterImpl(final SimpleString name,
+                      final SimpleString address,
+                      final BridgeConfiguration bridgeConfig,
+                      final boolean useDuplicateDetection,
+                      final int maxHops,
+                      final ExecutorFactory executorFactory,
+                      final StorageManager storageManager,
+                      final PostOffice postOffice,
+                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                      final ScheduledExecutorService scheduledExecutor,
+                      final QueueFactory queueFactory,
+                      final DiscoveryGroup discoveryGroup) throws Exception
+   {
+      this.name = name;
+
+      this.address = address;
+      
+      this.bridgeConfig = bridgeConfig;
+
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.queueSettingsRepository = queueSettingsRepository;
+
+      this.scheduledExecutor = scheduledExecutor;
+      
+      this.queueFactory = queueFactory;
+
+      this.discoveryGroup = discoveryGroup;
+
+      this.useDuplicateDetection = useDuplicateDetection;
+
+      this.maxHops = maxHops;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      if (discoveryGroup != null)
+      {
+         updateConnectors(discoveryGroup.getConnectors());
+
+         discoveryGroup.registerListener(this);
+      }
+
+      started = true;
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      if (discoveryGroup != null)
+      {
+         discoveryGroup.unregisterListener(this);
+      }
+
+      for (Bridge bridge : bridges.values())
+      {
+         bridge.stop();
+      }
+
+      started = false;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   // DiscoveryListener implementation ------------------------------------------------------------------
+
+   public void connectorsChanged()
+   {
+      try
+      {
+         List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
+
+         updateConnectors(connectors);
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to update connectors", e);
+      }
+   }
+
+   private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
+      Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+
+      connectorSet.addAll(connectors);
+
+      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge>> iter = bridges.entrySet()
+                                                                                                      .iterator();
+
+      while (iter.hasNext())
+      {
+         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge> entry = iter.next();
+
+         if (!connectorSet.contains(entry.getKey()))
+         {
+            // Connector no longer there - we should remove and close it - we don't delete the queue though - it may have messages - this is up to the admininstrator to do this
+
+            entry.getValue().stop();
+
+            iter.remove();
+         }
+      }
+
+      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+      {
+         if (!bridges.containsKey(connectorPair))
+         {
+            SimpleString queueName = new SimpleString("cluster." + name +
+                                                      "." +
+                                                      generateConnectorString(connectorPair.a) +
+                                                      "-" +
+                                                      (connectorPair.b == null ? "null"
+                                                                              : generateConnectorString(connectorPair.b)));
+
+            Binding queueBinding = postOffice.getBinding(queueName);
+
+            Queue queue;
+
+            if (queueBinding != null)
+            {
+               queue = (Queue)queueBinding.getBindable();
+            }
+            else
+            {
+               queue = queueFactory.createQueue(-1, name, null, true, false);
+
+               // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never actually routed to at that address though
+
+               Binding storeBinding = new BindingImpl(queue.getName(), queue.getName(), queue.getName(), queue, false, true);
+
+               storageManager.addQueueBinding(storeBinding);
+            }
+
+            Bridge bridge = new BridgeImpl(queueName,
+                                           queue,
+                                           connectorPair,
+                                           executorFactory.getExecutor(),
+                                           bridgeConfig.getMaxBatchSize(),
+                                           bridgeConfig.getMaxBatchTime(),
+                                           new SimpleString(bridgeConfig.getFilterString()),
+                                           null,
+                                           storageManager,
+                                           scheduledExecutor,
+                                           null,
+                                           bridgeConfig.getRetryInterval(),
+                                           bridgeConfig.getRetryIntervalMultiplier(),
+                                           bridgeConfig.getMaxRetriesBeforeFailover(),
+                                           bridgeConfig.getMaxRetriesAfterFailover(),
+                                           false);
+
+            bridges.put(connectorPair, bridge);
+
+            bridge.start();
+         }
+      }
+   }
+       
+   private void updateQueueInfo(final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final QueueInfo info) throws Exception
+   {
+      Bridge bridge = this.bridges.get(connectorPair);
+      
+      if (bridge == null)
+      {
+         throw new IllegalArgumentException("Cannot find bridge for " + connectorPair);
+      }
+      
+      SimpleString uniqueName = null;  // ?????
+      
+      FlowBinding flowBinding = (FlowBinding)postOffice.getBinding(uniqueName);
+      
+      if (flowBinding == null)
+      {
+         //TODO - can be optimised by storing the queue with the bridge in this class
+         Binding binding = postOffice.getBinding(bridge.getName());
+         
+         if (binding == null)
+         {
+            throw new IllegalStateException("Cannot find queue with name " + bridge.getName());
+         }
+         
+         Queue queue = (Queue)binding.getBindable();
+           
+         FlowBindingFilter filter = new FlowBindingFilter(info);
+         
+         flowBinding = new FlowBinding(new SimpleString(info.getAddress()), uniqueName, new SimpleString(info.getQueueName()), queue, filter);
+         
+         postOffice.addBinding(flowBinding);
+      }
+      else
+      {
+         FlowBindingFilter filter = flowBinding.getFilter();
+         
+         filter.updateInfo(info);
+      }
+   }
+   
+   private void removeQueueInfo()
+   {
+      //TODO
+   }
+   
+   private String replaceWildcardChars(final String str)
+   {
+      return str.replace('.', '-');
+   }
+
+   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+   {
+      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+      if (!config.getParams().isEmpty())
+      {
+         str.append("?");
+      }
+
+      boolean first = true;
+      for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+      {
+         if (!first)
+         {
+            str.append("&");
+         }
+         String encodedKey = replaceWildcardChars(entry.getKey());
+
+         String val = entry.getValue().toString();
+         String encodedVal = replaceWildcardChars(val);
+
+         str.append(encodedKey).append('=').append(encodedVal);
+
+         first = false;
+      }
+
+      return new SimpleString(str.toString());
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,122 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBindingFilter
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 20:53:20
+ *
+ *
+ */
+public class FlowBindingFilter implements Filter
+{
+   private volatile QueueInfo info;
+   
+   private final Map<String, Filter> filters = new HashMap<String, Filter>();
+   
+   public FlowBindingFilter(final QueueInfo info) throws Exception
+   {
+      this.info = info;
+      
+      updateInfo(info);
+   }
+   
+   public void updateInfo(final QueueInfo info) throws Exception
+   {
+      this.info = info;
+      
+      List<String> filterStrings = info.getFilterStrings();
+      
+      for (String filterString: filterStrings)
+      {
+         Filter filter = filters.get(filterString);
+         
+         if (filter == null)
+         {
+            filter = new FilterImpl(new SimpleString(filterString));
+            
+            filters.put(filterString, filter);
+         }
+      }
+      
+      //TODO - this can be optimised by storing map of filters in QueueInfo
+      if (filterStrings.size() < filters.size())
+      {
+         Iterator<String> iter = filters.keySet().iterator();
+         
+         while (iter.hasNext())
+         {
+            String filterString = iter.next();
+            
+            if (!filterStrings.contains(filterString))
+            {
+               iter.remove();
+            }
+         }
+      }
+   }
+         
+   public SimpleString getFilterString()
+   {         
+      return null;
+   }
+
+   public boolean match(final ServerMessage message)
+   {
+      if (info.getNumberOfConsumers() == 0)
+      {
+         return false;
+      }
+      
+      if (filters.isEmpty())
+      {
+         return true;
+      }
+      else
+      {         
+         for (Filter filter: filters.values())
+         {
+            if (filter.match(message))
+            {
+               return true;
+            }
+         }
+         return false;
+      }
+   }
+   
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.List;
+
+/**
+ * A QueueInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 20:55:06
+ *
+ *
+ */
+public class QueueInfo
+{
+   private final String queueName;
+   
+   private final String address;
+   
+   private final List<String> filterStrings;
+   
+   private final int numberOfConsumers;
+
+   public QueueInfo(final String queueName, final String address, final List<String> filterStrings, final int numberOfConsumers)
+   {
+      this.queueName = queueName;
+      this.address = address;
+      this.filterStrings = filterStrings;
+      this.numberOfConsumers = numberOfConsumers;
+   }
+
+   public String getQueueName()
+   {
+      return queueName;
+   }
+
+   public String getAddress()
+   {
+      return address;
+   }
+
+   public List<String> getFilterStrings()
+   {
+      return filterStrings;
+   }
+
+   public int getNumberOfConsumers()
+   {
+      return numberOfConsumers;
+   }            
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -43,13 +43,10 @@
 import org.jboss.messaging.core.persistence.QueueBindingInfo;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.DivertBinding;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.DivertBindingImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -64,6 +61,7 @@
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
+import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.Divert;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
@@ -291,11 +289,11 @@
                                                 true,
                                                 false);
 
-         Binding binding = new QueueBindingImpl(queueBindingInfo.getAddress(), queue);
+         Binding binding = new BindingImpl(queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), queueBindingInfo.getQueueName(), queue, false, true);
 
          queues.put(queueBindingInfo.getPersistenceID(), queue);
 
-         postOffice.addBinding(binding);
+         postOffice.addBinding(binding);         
       }
 
       Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
@@ -801,7 +799,7 @@
 
             Queue queue = queueFactory.createQueue(-1, name, filter, config.isDurable(), false);
 
-            QueueBinding queueBinding = new QueueBindingImpl(new SimpleString(config.getAddress()), queue);
+            Binding queueBinding = new BindingImpl(new SimpleString(config.getAddress()), name, name, queue, false, true);
             
             binding = queueBinding;
 
@@ -870,7 +868,7 @@
                                         pagingManager,
                                         storageManager);
 
-         DivertBinding binding = new DivertBindingImpl(sAddress, divert);
+         Binding binding = new BindingImpl(sAddress, sName, new SimpleString(config.getRoutingName()), divert, config.isExclusive(), false);
 
          postOffice.addBinding(binding);
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -361,8 +361,7 @@
          else
          {
             ref.getQueue().acknowledge(tx, ref);
-            // ref.acknowledge(tx, storageManager, postOffice, queueSettingsRepository);
-
+            
             // Del count is not actually updated in storage unless it's
             // cancelled
             ref.incrementDeliveryCount();
@@ -559,6 +558,8 @@
             return HandleStatus.BUSY;
          }
 
+         //TODO use a null or boolean check here for performance
+         
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
          if (pendingLargeMessagesCounter.get() > 0)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -35,8 +35,7 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.DelayedResult;
 import org.jboss.messaging.core.remoting.FailureListener;
@@ -1475,13 +1474,11 @@
 
          final Queue queue = queueFactory.createQueue(-1, name, filter, durable, temporary);
 
-         binding = new QueueBindingImpl(address, queue);
+         binding = new BindingImpl(address, name, name, queue, false, true);
 
          if (durable)
          {
-            QueueBinding queueBinding = (QueueBinding)binding;
-
-            storageManager.addQueueBinding(queueBinding);
+            storageManager.addQueueBinding(binding);                        
          }
 
          postOffice.addBinding(binding);
@@ -1659,7 +1656,7 @@
             {
                if (binding.isQueueBinding())
                {
-                  names.add(binding.getBindable().getUniqueName());
+                  names.add(binding.getUniqueName());
                }
             }
          }

Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/schemas/jbm-configuration.xsd	2009-01-22 11:16:47 UTC (rev 5686)
@@ -187,10 +187,10 @@
 		<xsd:complexType>
 			<xsd:sequence>
 				<xsd:element ref="local-bind-address" maxOccurs="1"
-					minOccurs="1">
+					minOccurs="0">
 				</xsd:element>
 				<xsd:element ref="local-bind-port" maxOccurs="1"
-					minOccurs="1">
+					minOccurs="0">
 				</xsd:element>
 				<xsd:element ref="group-address" maxOccurs="1"
 					minOccurs="1">
@@ -228,26 +228,6 @@
 		</xsd:complexType>
 	</xsd:element>
 
-	<xsd:element name="message-flow">
-		<xsd:complexType>
-			<xsd:sequence>
-				<xsd:element name="address" type="xsd:string"></xsd:element>
-				<xsd:choice>
-					<xsd:element ref="discovery-group-ref" minOccurs="1"
-						maxOccurs="1">
-					</xsd:element>
-					<xsd:element ref="connector-ref" minOccurs="1"
-						maxOccurs="unbounded">
-					</xsd:element>
-				</xsd:choice>
-				<xsd:element name="exclusive" type="xsd:boolean"
-					minOccurs="0" maxOccurs="1">
-				</xsd:element>
-			</xsd:sequence>
-			<xsd:attribute name="name" type="xsd:ID" use="required"></xsd:attribute>
-		</xsd:complexType>
-	</xsd:element>
-
 	<xsd:element name="discovery-group-ref">
 		<xsd:complexType>
 			<xsd:attribute name="discovery-group-name"
@@ -351,9 +331,6 @@
 			<xsd:element name="use-duplicate-detection"
 				type="xsd:boolean" maxOccurs="1" minOccurs="0">
 			</xsd:element>
-			<xsd:element name="max-hops" type="xsd:int" maxOccurs="1"
-				minOccurs="0">
-			</xsd:element>
 			<xsd:choice>
 				<xsd:element name="connector-ref"
 					type="connector-refType" minOccurs="1" maxOccurs="1">
@@ -373,14 +350,17 @@
 
 	<xsd:complexType name="divertType">
 		<xsd:sequence>
-			<xsd:element name="routing-name" type="xsd:string"
+		   <xsd:element name="unique-name" type="xsd:string"
 				maxOccurs="1" minOccurs="1">
 			</xsd:element>
+			<xsd:element name="routing-name" type="xsd:string"
+				maxOccurs="1" minOccurs="0">
+			</xsd:element>
 			<xsd:element name="address" type="xsd:string" maxOccurs="1"
 				minOccurs="1">
 			</xsd:element>
 			<xsd:element name="forwarding-address" type="xsd:string"
-				maxOccurs="1" minOccurs="0">
+				maxOccurs="1" minOccurs="1">
 			</xsd:element>
 			<xsd:element name="filter" type="filterType" maxOccurs="1"
 				minOccurs="0">
@@ -408,8 +388,6 @@
 		</xsd:attribute>
 	</xsd:complexType>
 
-
-
 	<xsd:complexType name="filterType">
 		<xsd:attribute name="string" type="xsd:string" use="required"></xsd:attribute>
 	</xsd:complexType>

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,148 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.failover;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-
-/**
- * A LargeMessageMuliThreadFailoverTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
- * Created Jan 18, 2009 4:52:09 PM
- *
- *
- */
-public class LargeMessageMuliThreadFailoverTest extends MultiThreadRandomFailoverTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-   
-   protected ClientSessionFactoryInternal createSessionFactory()
-   {
-      ClientSessionFactoryInternal sf = super.createSessionFactory();
-      
-      sf.setMinLargeMessageSize(200);
-      
-      return sf;
-
-   }
-
-
-   @Override
-   protected void start() throws Exception
-   {
-
-      deleteDirectory(new File(getTestDir()));
-
-      Configuration backupConf = new ConfigurationImpl();
-
-      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
-      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
-      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
-      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
-      backupConf.setJournalFileSize(100 * 1024);
-
-      backupConf.setSecurityEnabled(false);
-      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      backupConf.getAcceptorConfigurations()
-                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
-      backupConf.setBackup(true);
-
-      backupService = MessagingServiceImpl.newMessagingService(backupConf);
-      backupService.start();
-
-      Configuration liveConf = new ConfigurationImpl();
-
-      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
-      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
-      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
-      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
-
-      liveConf.setJournalFileSize(100 * 1024);
-
-      liveConf.setSecurityEnabled(false);
-      liveConf.getAcceptorConfigurations()
-              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
-
-      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
-      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
-                                                                   backupParams,
-                                                                   "backup-connector");
-      connectors.put(backupTC.getName(), backupTC);
-      liveConf.setConnectorConfigurations(connectors);
-      liveConf.setBackupConnectorName(backupTC.getName());
-      liveService = MessagingServiceImpl.newMessagingService(liveConf);
-
-      liveService.start();
-
-   }
-   
-   @Override
-   protected void setBody(final ClientMessage message) throws Exception
-   {
-      message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(500)));
-
-   }
-
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
-    */
-   @Override
-   protected boolean checkSize(ClientMessage message)
-   {
-      return 500 ==  message.getBodySize();
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java (from rev 5683, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,148 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+
+/**
+ * A LargeMessageMultiThreadFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Jan 18, 2009 4:52:09 PM
+ *
+ *
+ */
+public class LargeMessageMultiThreadFailoverTest extends MultiThreadRandomFailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      ClientSessionFactoryInternal sf = super.createSessionFactory();
+      
+      sf.setMinLargeMessageSize(200);
+      
+      return sf;
+
+   }
+
+
+   @Override
+   protected void start() throws Exception
+   {
+
+      deleteDirectory(new File(getTestDir()));
+
+      Configuration backupConf = new ConfigurationImpl();
+
+      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+      backupConf.setJournalFileSize(100 * 1024);
+
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+      backupConf.setBackup(true);
+
+      backupService = MessagingServiceImpl.newMessagingService(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+
+      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+      liveConf.setJournalFileSize(100 * 1024);
+
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+      liveService.start();
+
+   }
+   
+   @Override
+   protected void setBody(final ClientMessage message) throws Exception
+   {
+      message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(500)));
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+    */
+   @Override
+   protected boolean checkSize(ClientMessage message)
+   {
+      return 500 ==  message.getBodySize();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -1243,7 +1243,7 @@
 
    protected int getNumIterations()
    {
-      return 1;
+      return 20;
    }
 
    @Override

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -33,6 +34,30 @@
  */
 public class FakeBinding implements Binding
 {
+   public boolean accept(ServerMessage message) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   public SimpleString getRoutingName()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   public SimpleString getUniqueName()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   public boolean isExclusive()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
    private SimpleString address;
 
    private Bindable bindable;

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -23,7 +23,7 @@
 
 package org.jboss.messaging.tests.stress.failover;
 
-import org.jboss.messaging.tests.integration.cluster.failover.LargeMessageMuliThreadFailoverTest;
+import org.jboss.messaging.tests.integration.cluster.failover.LargeMessageMultiThreadFailoverTest;
 
 /**
  * A LargeMessageMultiThreadFailoverStressTest
@@ -34,7 +34,7 @@
  *
  *
  */
-public class LargeMessageMultiThreadFailoverStressTest extends LargeMessageMuliThreadFailoverTest
+public class LargeMessageMultiThreadFailoverStressTest extends LargeMessageMultiThreadFailoverTest
 {
 
    // Constants -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -75,83 +75,7 @@
 
    // Public --------------------------------------------------------
 
-   public void disabledtestFoo() throws Exception
-   {
-      
-      final int numberOfInserts = 50000;
-      
-     // final int numberOfAddRefs = 10;
-      
-//      for (long i = 1; i <= 10000; i++)
-//      {
-//         if (i % 10000 == 0)
-//         {
-//            System.out.println("Append " + i);
-//         }
-//         impl.appendAddRecord(i, (byte)12, encoding);
-//         
-//         for (int j = 0; j < numberOfAddRefs; j++)
-//         {
-//            impl.appendUpdateRecord(i, (byte)12, updateEncoding);
-//         }
-//      }
-//      
-      System.out.println("Starting");
-      
-//      for (int numberOfAddRefs = 0; numberOfAddRefs < 10; numberOfAddRefs++)
-//      {  
-      final int numberOfAddRefs = 1;
-      while (true)
-      {
-         File file = new File(getTestDir());
-         deleteDirectory(file);
-         file.mkdirs();
-         
-         SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
-         JournalImpl impl = new JournalImpl(10 * 1024 * 1024, 10, true, false, factory, "jbm", "jbm", 1000, 1000);
-
-         impl.start();
-
-         impl.load(dummyLoader);
-         
-         System.out.println("Loaded");
-         
-         SimpleEncoding encoding = new SimpleEncoding(1024, (byte)'f');
-         
-         SimpleEncoding updateEncoding = new SimpleEncoding(8, (byte)'f');
-              
-         
-         long start = System.currentTimeMillis();
-         
-         for (long i = 1; i <= numberOfInserts; i++)
-         {
-            if (i % 10000 == 0)
-            {
-               System.out.println("Append " + i);
-            }
-            impl.appendAddRecord(i, (byte)12, encoding);
-            
-            for (int j = 0; j < numberOfAddRefs; j++)
-            {
-               impl.appendUpdateRecord(i, (byte)12, updateEncoding);
-            }
-         }
-         
-         long end = System.currentTimeMillis();
-         
-         double rate = 1000 * ((double)numberOfInserts) / (end - start);
    
-         System.out.println("rate " + rate);
-   
-         impl.stop();
-      
-      }
-
-     
-
-   }
-   
-   
    public void testInsertAndLoad() throws Exception
    {
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java	2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java	2009-01-22 11:16:47 UTC (rev 5686)
@@ -21,16 +21,10 @@
  */
 package org.jboss.messaging.tests.unit.core.postoffice.impl;
 
-import org.easymock.EasyMock;
 import org.jboss.messaging.core.postoffice.AddressManager;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.BindingType;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
 import org.jboss.messaging.core.postoffice.impl.SimpleAddressManager;
-import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeQueueFactory;
 import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>




More information about the jboss-cvs-commits mailing list