[jboss-cvs] JBoss Messaging SVN: r5381 - in trunk: src/main/org/jboss/messaging/core/config/cluster and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 18 06:58:28 EST 2008


Author: timfox
Date: 2008-11-18 06:58:28 -0500 (Tue, 18 Nov 2008)
New Revision: 5381

Added:
   trunk/src/main/org/jboss/messaging/core/config/cluster/
   trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java
   trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java
   trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/
   trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java
   trunk/src/main/org/jboss/messaging/core/server/Forwarder.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
Log:
Clustering


Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -26,6 +26,9 @@
 import java.util.List;
 import java.util.Set;
 
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.server.JournalType;
 import org.jboss.messaging.util.SimpleString;
 
@@ -89,9 +92,17 @@
 
    void setBackupConnectorConfiguration(TransportConfiguration config);
    
-   Set<OutflowConfiguration> getOutflowConfigurations();
+   Set<BroadcastGroupConfiguration> getBroadcastGroupConfigurations();
+   
+   void setBroadcastGroupConfigurations(Set<BroadcastGroupConfiguration> configs);
+   
+   Set<DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations();
+   
+   void setDiscoveryGroupConfigurations(Set<DiscoveryGroupConfiguration> configs);
+   
+   Set<MessageFlowConfiguration> getMessageFlowConfigurations();
 
-   void setOutFlowConfigurations(final Set<OutflowConfiguration> configs);
+   void setMessageFlowConfigurations(final Set<MessageFlowConfiguration> configs);
    
    SimpleString getManagementAddress();
    

Deleted: trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.config;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * A OutflowConfiguration
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 12 Nov 2008 13:52:22
- *
- *
- */
-public class OutflowConfiguration implements Serializable
-{
-   private static final long serialVersionUID = 6583525368508418953L;
-
-   private final String name;
-   
-   private final String address;
-   
-   private final String filterString;
-   
-   private final boolean fanout;
-   
-   private final int maxBatchSize;
-   
-   private final long maxBatchTime;
-   
-   private final List<TransportConfiguration> connectors;
-
-   public OutflowConfiguration(final String name,
-                               final String address,
-                               final String filterString,
-                               final boolean fanout,
-                               final int maxBatchSize,
-                               final long maxBatchTime,
-                               final List<TransportConfiguration> connectors)
-   {    
-      this.name = name;
-      this.address = address;
-      this.filterString = filterString;
-      this.fanout = fanout;
-      this.maxBatchSize = maxBatchSize;
-      this.maxBatchTime = maxBatchTime;
-      this.connectors = connectors;
-   }
-
-   public String getName()
-   {
-      return name;
-   }
-   
-   public String getAddress()
-   {
-      return address;
-   }
-
-   public String getFilterString()
-   {
-      return filterString;
-   }
-
-   public boolean isFanout()
-   {
-      return fanout;
-   }
-
-   public int getMaxBatchSize()
-   {
-      return maxBatchSize;
-   }
-
-   public long getMaxBatchTime()
-   {
-      return maxBatchTime;
-   }
-
-   public List<TransportConfiguration> getConnectors()
-   {
-      return connectors;
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -62,15 +62,59 @@
       return params;
    }
    
-   public boolean equals(Object other)
+   private int hash = -1;
+   
+   public int hashCode()
    {
+      return factoryClassName.hashCode();
+   }
+   
+   public boolean equals(final Object other)
+   {
       if (other instanceof TransportConfiguration == false)
       {
          return false;
       }
       
-      TransportConfiguration ai = (TransportConfiguration)other;
-      
-      return this.factoryClassName.equals(ai.factoryClassName);
+      TransportConfiguration kother = (TransportConfiguration)other;
+
+      if (factoryClassName.equals(kother.factoryClassName))
+      {
+         if (params == null)
+         {
+            return kother.params == null;
+         }
+         else
+         {
+            if (kother.params == null)
+            {
+               return false;
+            }
+            else if (params.size() == kother.params.size())
+            {
+               for (Map.Entry<String, Object> entry : params.entrySet())
+               {
+                  Object thisVal = entry.getValue();
+
+                  Object otherVal = kother.params.get(entry.getKey());
+
+                  if (otherVal == null || !otherVal.equals(thisVal))
+                  {
+                     return false;
+                  }
+               }
+               return true;
+            }
+            else
+            {
+               return false;
+            }
+         }
+      }
+      else
+      {
+         return false;
+      }
    }
+   
 }

Added: trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+
+/**
+ * A BroadcastGroupConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 08:44:30
+ *
+ *
+ */
+public class BroadcastGroupConfiguration implements Serializable
+{
+   private static final long serialVersionUID = 1052413739064253955L;
+   
+   private final String name;
+   
+   private final String localBindAddress;
+   
+   private final int localBindPort;
+   
+   private final String groupAddress;
+   
+   private final int groupPort;
+   
+   private final long broadcastPeriod;
+
+   public BroadcastGroupConfiguration(final String name,
+                                      final String localBindAddress,
+                                      final int localBindPort,
+                                      final String groupAddress,
+                                      final int groupPort,
+                                      final long broadcastPeriod)
+   {
+      super();
+      this.name = name;
+      this.localBindAddress = localBindAddress;
+      this.localBindPort = localBindPort;
+      this.groupAddress = groupAddress;
+      this.groupPort = groupPort;
+      this.broadcastPeriod = broadcastPeriod;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public String getLocalBindAddress()
+   {
+      return localBindAddress;
+   }
+
+   public int getLocalBindPort()
+   {
+      return localBindPort;
+   }
+
+   public String getGroupAddress()
+   {
+      return groupAddress;
+   }
+
+   public int getGroupPort()
+   {
+      return groupPort;
+   }
+
+   public long getBroadcastPeriod()
+   {
+      return broadcastPeriod;
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+
+/**
+ * A DiscoveryGroupConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 08:47:30
+ *
+ *
+ */
+public class DiscoveryGroupConfiguration implements Serializable
+{
+   private static final long serialVersionUID = 8657206421727863400L;
+
+   private final String name;
+   
+   private final String groupAddress;
+   
+   private final int groupPort;
+   
+   private final long refreshTimeout;
+
+   public DiscoveryGroupConfiguration(final String name,                      
+                                      final String groupAddress,
+                                      final int groupPort,
+                                      final long refreshTimeout)
+   {
+      super();
+      this.name = name;
+      this.groupAddress = groupAddress;
+      this.groupPort = groupPort;
+      this.refreshTimeout = refreshTimeout;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public String getGroupAddress()
+   {
+      return groupAddress;
+   }
+
+   public int getGroupPort()
+   {
+      return groupPort;
+   }
+
+   public long getRefreshTimeout()
+   {
+      return refreshTimeout;
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+
+/**
+ * A MessageFlowConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 12 Nov 2008 13:52:22
+ *
+ *
+ */
+public class MessageFlowConfiguration implements Serializable
+{
+   private static final long serialVersionUID = 6583525368508418953L;
+
+   private final String name;
+
+   private final String address;
+
+   private final String filterString;
+
+   private final boolean fanout;
+
+   private final int maxBatchSize;
+
+   private final long maxBatchTime;
+
+   private final List<TransportConfiguration> staticConnectors;
+
+   private final String discoveryGroupName;
+
+   private final String transformerClassName;
+
+   public MessageFlowConfiguration(final String name,
+                                   final String address,
+                                   final String filterString,
+                                   final boolean fanout,
+                                   final int maxBatchSize,
+                                   final long maxBatchTime,
+                                   final String transformerClassName,
+                                   final List<TransportConfiguration> connectors)
+   {
+      this.name = name;
+      this.address = address;
+      this.filterString = filterString;
+      this.fanout = fanout;
+      this.maxBatchSize = maxBatchSize;
+      this.maxBatchTime = maxBatchTime;
+      this.transformerClassName = transformerClassName;
+      this.staticConnectors = connectors;
+      this.discoveryGroupName = null;
+   }
+
+   public MessageFlowConfiguration(final String name,
+                                   final String address,
+                                   final String filterString,
+                                   final boolean fanout,
+                                   final int maxBatchSize,
+                                   final long maxBatchTime,
+                                   final String transformerClassName,
+                                   final String discoveryGroupName)
+   {
+      this.name = name;
+      this.address = address;
+      this.filterString = filterString;
+      this.fanout = fanout;
+      this.maxBatchSize = maxBatchSize;
+      this.maxBatchTime = maxBatchTime;
+      this.transformerClassName = transformerClassName;
+      this.staticConnectors = null;
+      this.discoveryGroupName = discoveryGroupName;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public String getAddress()
+   {
+      return address;
+   }
+
+   public String getFilterString()
+   {
+      return filterString;
+   }
+
+   public boolean isFanout()
+   {
+      return fanout;
+   }
+
+   public int getMaxBatchSize()
+   {
+      return maxBatchSize;
+   }
+
+   public long getMaxBatchTime()
+   {
+      return maxBatchTime;
+   }
+
+   public String getTransformerClassName()
+   {
+      return transformerClassName;
+   }
+
+   public List<TransportConfiguration> getConnectors()
+   {
+      return staticConnectors;
+   }
+
+   public String getDiscoveryGroupName()
+   {
+      return this.discoveryGroupName;
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -18,8 +18,10 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.server.JournalType;
 import org.jboss.messaging.util.SimpleString;
 
@@ -111,8 +113,13 @@
 
    protected TransportConfiguration backupConnectorConfig;
    
-   protected Set<OutflowConfiguration> outFlowConfigs = new HashSet<OutflowConfiguration>();
+   protected Set<MessageFlowConfiguration> outFlowConfigs = new HashSet<MessageFlowConfiguration>();
+   
+   protected Set<BroadcastGroupConfiguration> broadcastGroupConfigurations = new HashSet<BroadcastGroupConfiguration>();
+   
+   protected Set<DiscoveryGroupConfiguration> discoveryGroupConfigurations = new HashSet<DiscoveryGroupConfiguration>();
 
+      
    // Paging related attributes
 
    protected long pagingMaxGlobalSize = -1;
@@ -253,16 +260,37 @@
       backupConnectorConfig = config;
    }
    
-   public Set<OutflowConfiguration> getOutflowConfigurations()
+   public Set<MessageFlowConfiguration> getMessageFlowConfigurations()
    {
       return outFlowConfigs;
    }
 
-   public void setOutFlowConfigurations(final Set<OutflowConfiguration> configs)
+   public void setMessageFlowConfigurations(final Set<MessageFlowConfiguration> configs)
    {
       this.outFlowConfigs = configs;
    }
+   
+   public Set<BroadcastGroupConfiguration> getBroadcastGroupConfigurations()
+   {
+      return broadcastGroupConfigurations;
+   }
 
+   public void setBroadcastGroupConfigurations(Set<BroadcastGroupConfiguration> broadcastGroupConfigurations)
+   {
+      this.broadcastGroupConfigurations = broadcastGroupConfigurations;
+   }
+
+   public Set<DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations()
+   {
+      return discoveryGroupConfigurations;
+   }
+
+   public void setDiscoveryGroupConfigurations(Set<DiscoveryGroupConfiguration> discoveryGroupConfigurations)
+   {
+      this.discoveryGroupConfigurations = discoveryGroupConfigurations;
+   }
+
+
    public String getBindingsDirectory()
    {
       return bindingsDirectory;
@@ -491,5 +519,7 @@
              cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval() &&
              cother.getManagementAddress().equals(getManagementAddress());
    }
+   
+   
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -46,6 +46,7 @@
 * JBMTimestamp - the timestamp of the message
 * JBMDurable - "DURABLE" or "NON_DURABLE"
 * JBMExpiration - the expiration of the message
+* JBMSize - the encoded size of the full message in bytes
 * Any other identifers that appear in a filter expression represent header values for the message
 * 
 * String values must be set as <code>SimpleString</code>, not <code>java.lang.String</code> (see JBMESSAGING-1307).
@@ -81,6 +82,8 @@
   private static final SimpleString JBM_PRIORITY = new SimpleString("JBMPriority");
 
   private static final SimpleString JBM_MESSAGE_ID = new SimpleString("JBMMessageID");
+  
+  private static final SimpleString JBM_SIZE = new SimpleString("JBMSize");
 
   private static final SimpleString JBM_PREFIX = new SimpleString("JBM");
    
@@ -180,6 +183,10 @@
      {
         return msg.getExpiration();
      }
+     else if (JBM_SIZE.equals(fieldName))
+     {
+        return msg.getEncodeSize();
+     }
      else
      {
         return null;

Deleted: trunk/src/main/org/jboss/messaging/core/server/Forwarder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Forwarder.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/server/Forwarder.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server;
-
-
-/**
- * A Forwarder
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 15 Nov 2008 09:42:31
- *
- *
- */
-public interface Forwarder extends Consumer
-{
-   void close() throws Exception;
-}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A BroadcastGroup
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 09:29:45
+ *
+ *
+ */
+public interface BroadcastGroup extends MessagingComponent
+{
+   void addConnector(final TransportConfiguration connector);
+
+   void removeConnector(final TransportConfiguration connector);
+
+   int size();
+
+   void broadcastConnectors() throws Exception;
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A ClusterManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 09:23:26
+ *
+ *
+ */
+public interface ClusterManager extends MessagingComponent
+{
+   void deployBroadcastGroup(BroadcastGroupConfiguration broadcastGroupConfig) throws Exception;
+   
+   void deployDiscoveryGroup(DiscoveryGroupConfiguration discoveryGroupConfig) throws Exception;
+   
+   void deployMessageFlow(MessageFlowConfiguration messageFlowConfig) throws Exception;
+   
+   void undeployBroadcastGroup(String name) throws Exception;
+   
+   void undeployDiscoveryGroup(String name) throws Exception;
+   
+   void undeployMessageFlow(String name) throws Exception;
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A DiscoveryGroup
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 09:26:54
+ *
+ *
+ */
+public interface DiscoveryGroup extends MessagingComponent
+{
+   List<TransportConfiguration> getConnectors();
+   
+   boolean waitForBroadcast(long timeout);
+   
+   void registerListener(final DiscoveryListener listener);
+   
+   void unregisterListener(final DiscoveryListener listener);
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+/**
+ * A DiscoveryListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 17 Nov 2008 14:30:39
+ *
+ *
+ */
+public interface DiscoveryListener
+{
+   void connectorsChanged();
+}

Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java (from rev 5368, trunk/src/main/org/jboss/messaging/core/server/Forwarder.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+
+/**
+ * A Forwarder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 09:42:31
+ *
+ *
+ */
+public interface Forwarder extends Consumer, MessagingComponent
+{
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A MessageFlow
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 09:41:01
+ *
+ *
+ */
+public interface MessageFlow extends MessagingComponent
+{
+   
+
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ * A Transformer
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 11:01:50
+ *
+ *
+ */
+public interface Transformer
+{
+   ServerMessage transform(ServerMessage message);
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+
+/**
+ * A BroadcastGroupImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 09:45:32
+ *
+ */
+public class BroadcastGroupImpl implements BroadcastGroup, Runnable
+{
+   private static final Logger log = Logger.getLogger(BroadcastGroupImpl.class);
+
+   private final InetAddress localBindAddress;
+
+   private final int localPort;
+
+   private final InetAddress groupAddress;
+
+   private final int groupPort;
+
+   private DatagramSocket socket;
+   
+   private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+   
+   private volatile boolean started;
+   
+   private volatile ScheduledFuture<?> future;
+   
+   public BroadcastGroupImpl(final InetAddress localBindAddress,
+                             final int localPort,
+                             final InetAddress groupAddress,
+                             final int groupPort) throws Exception
+   {
+      this.localBindAddress = localBindAddress;
+
+      this.localPort = localPort;
+
+      this.groupAddress = groupAddress;
+
+      this.groupPort = groupPort;
+
+      // FIXME - doesn't seem to work when specifying port and address
+
+      // this.socket = new DatagramSocket(localPort, localBindAddress);     
+   }
+   
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      socket = new DatagramSocket();
+      
+      started = true;
+   }
+   
+   public synchronized void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      if (future != null)
+      {
+         future.cancel(false);
+      }
+            
+      socket.close();
+      
+      started = false;
+   }
+   
+   public boolean isStarted()
+   {
+      return started;
+   }
+   
+   public void addConnector(final TransportConfiguration connector)
+   {
+      connectors.add(connector);
+   }
+
+   public void removeConnector(final TransportConfiguration connector)
+   {
+      connectors.remove(connector);
+   }
+
+   public int size()
+   {
+      return connectors.size();
+   }
+   
+   public void broadcastConnectors() throws Exception
+   {
+      // TODO - for now we just use plain serialization to serialize the transport configs
+      // we should use our own format for a tighter representation
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+      ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+      oos.writeInt(connectors.size());
+
+      for (TransportConfiguration connector : connectors)
+      {
+         oos.writeObject(connector);
+      }
+
+      oos.flush();
+
+      byte[] data = bos.toByteArray();
+
+      DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+
+      log.info("broadcasting packet");
+
+      socket.send(packet);
+   }
+
+   public void run()
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      try
+      {      
+         broadcastConnectors();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to broadcast connector configs");
+      }
+   }
+   
+   public void setScheduledFuture(final ScheduledFuture<?> future)
+   {
+      this.future = future;
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,303 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.ClusterManager;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.MessageFlow;
+import org.jboss.messaging.core.server.cluster.Transformer;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClusterManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 09:23:49
+ *
+ *
+ */
+public class ClusterManagerImpl implements ClusterManager
+{
+   private final Map<String, BroadcastGroup> broadcastGroups = new HashMap<String, BroadcastGroup>();
+
+   private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
+
+   private final Map<String, MessageFlow> messageFlows = new HashMap<String, MessageFlow>();
+
+   private final ExecutorFactory executorFactory;
+
+   private final StorageManager storageManager;
+
+   private final PostOffice postOffice;
+
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private volatile boolean started;
+
+   public ClusterManagerImpl(final ExecutorFactory executorFactory,
+                             final StorageManager storageManager,
+                             final PostOffice postOffice,
+                             final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                             final ScheduledExecutorService scheduledExecutor)
+   {
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.queueSettingsRepository = queueSettingsRepository;
+
+      this.scheduledExecutor = scheduledExecutor;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      for (BroadcastGroup group: broadcastGroups.values())
+      {
+         group.start();
+      }
+      
+      for (DiscoveryGroup group: discoveryGroups.values())
+      {
+         group.start();
+      }
+      
+      for (MessageFlow flow: this.messageFlows.values())
+      {
+         flow.start();
+      }
+
+      started = true;
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      for (BroadcastGroup group: broadcastGroups.values())
+      {
+         group.stop();
+      }
+      
+      for (DiscoveryGroup group: discoveryGroups.values())
+      {
+         group.stop();
+      }
+      
+      for (MessageFlow flow: this.messageFlows.values())
+      {
+         flow.stop();
+      }
+
+      started = false;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
+   {
+      if (broadcastGroups.containsKey(config.getName()))
+      {
+         throw new IllegalArgumentException("There is already a broadcast-group with name " + config.getName() +
+                                            " deployed");
+      }
+
+      InetAddress localBindAddress = InetAddress.getByName(config.getLocalBindAddress());
+
+      InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+      BroadcastGroupImpl group = new BroadcastGroupImpl(localBindAddress,
+                                                        config.getLocalBindPort(),
+                                                        groupAddress,
+                                                        config.getGroupPort());
+
+      ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
+                                                                           0L,
+                                                                           config.getBroadcastPeriod(),
+                                                                           MILLISECONDS);
+
+      group.setScheduledFuture(future);
+
+      broadcastGroups.put(config.getName(), group);
+
+      group.start();
+   }
+
+   public synchronized void deployDiscoveryGroup(final DiscoveryGroupConfiguration config) throws Exception
+   {
+      if (discoveryGroups.containsKey(config.getName()))
+      {
+         throw new IllegalArgumentException("There is already a discovery-group with name " + config.getName() +
+                                            " deployed");
+      }
+
+      InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+      DiscoveryGroup group = new DiscoveryGroupImpl(groupAddress, config.getGroupPort(), config.getRefreshTimeout());
+
+      discoveryGroups.put(config.getName(), group);
+
+      group.start();
+   }
+
+   public synchronized void deployMessageFlow(final MessageFlowConfiguration config) throws Exception
+   {
+      if (messageFlows.containsKey(config.getName()))
+      {
+         throw new IllegalArgumentException("There is already a message-flow with name " + config.getName() +
+                                            " deployed");
+      }
+
+      Transformer transformer = null;
+
+      if (config.getTransformerClassName() != null)
+      {
+         ClassLoader loader = Thread.currentThread().getContextClassLoader();
+         try
+         {
+            Class<?> clz = loader.loadClass(config.getTransformerClassName());
+            transformer = (Transformer)clz.newInstance();
+         }
+         catch (Exception e)
+         {
+            throw new IllegalArgumentException("Error instantiating transformer class \"" + config.getTransformerClassName() +
+                                                        "\"",
+                                               e);
+         }
+      }
+
+      MessageFlow flow;
+
+      if (config.getDiscoveryGroupName() == null)
+      {
+         // Create message flow with list of static connectors
+
+         flow = new MessageFlowImpl(new SimpleString(config.getName()),
+                                    new SimpleString(config.getAddress()),
+                                    config.getMaxBatchSize(),
+                                    config.getMaxBatchTime(),
+                                    config.getFilterString() == null ? null
+                                                                    : new SimpleString(config.getFilterString()),
+                                    config.isFanout(),
+                                    executorFactory,
+                                    storageManager,
+                                    postOffice,
+                                    queueSettingsRepository,
+                                    transformer,
+                                    config.getConnectors());
+      }
+      else
+      {
+         // Create message flow with connectors from discovery group
+
+         DiscoveryGroup group = discoveryGroups.get(config.getDiscoveryGroupName());
+
+         if (group == null)
+         {
+            throw new IllegalArgumentException("There is no discovery-group with name " + config.getDiscoveryGroupName() +
+                                               " deployed");
+         }
+
+         flow = new MessageFlowImpl(new SimpleString(config.getName()),
+                                    new SimpleString(config.getAddress()),
+                                    config.getMaxBatchSize(),
+                                    config.getMaxBatchTime(),
+                                    config.getFilterString() == null ? null
+                                                                    : new SimpleString(config.getFilterString()),
+                                    config.isFanout(),
+                                    this.executorFactory,
+                                    storageManager,
+                                    postOffice,
+                                    queueSettingsRepository,
+                                    transformer,
+                                    group);
+      }
+
+      messageFlows.put(config.getName(), flow);
+
+      flow.start();
+   }
+
+   public synchronized void undeployBroadcastGroup(final String name) throws Exception
+   {
+      BroadcastGroup group = broadcastGroups.get(name);
+
+      if (group != null)
+      {
+         group.stop();
+      }
+   }
+
+   public synchronized void undeployDiscoveryGroup(final String name) throws Exception
+   {
+      DiscoveryGroup group = discoveryGroups.get(name);
+
+      if (group != null)
+      {
+         group.stop();
+      }
+   }
+
+   public synchronized void undeployMessageFlow(final String name) throws Exception
+   {
+      MessageFlow flow = messageFlows.get(name);
+
+      if (flow != null)
+      {
+         flow.stop();
+      }
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,269 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.DiscoveryListener;
+
+/**
+ * A DiscoveryGroupImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 17 Nov 2008 13:21:45
+ *
+ */
+public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
+{
+   private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
+   
+   private static final int SOCKET_TIMEOUT = 500;
+   
+   private MulticastSocket socket;
+
+   private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+        
+   private final Thread thread;
+   
+   private boolean received;
+   
+   private final Object waitLock = new Object();
+   
+   private final Map<TransportConfiguration, Long> connectors = new HashMap<TransportConfiguration, Long>();
+   
+   private final long timeout;
+   
+   private volatile boolean started;
+      
+   public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
+   {
+      socket = new MulticastSocket(groupPort);
+
+      socket.joinGroup(groupAddress);
+      
+      socket.setSoTimeout(SOCKET_TIMEOUT);
+      
+      this.timeout = timeout;
+      
+      thread = new Thread(this);
+   }
+   
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      thread.start();
+      
+      started = true;
+   }
+   
+   public synchronized void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      try
+      {
+         thread.join();
+      }
+      catch (InterruptedException e)
+      {        
+      }
+      
+      socket.close();
+      
+      started = false;
+   }
+   
+   public boolean isStarted()
+   {
+      return started;
+   }
+        
+   public synchronized List<TransportConfiguration> getConnectors()
+   {
+      return new ArrayList<TransportConfiguration>(connectors.keySet());
+   }
+   
+   public boolean waitForBroadcast(final long timeout)
+   {
+      synchronized (waitLock)
+      { 
+         long start = System.currentTimeMillis();
+         
+         long toWait = timeout;
+         
+         while (!received && toWait > 0)
+         {      
+            try
+            {
+               waitLock.wait(toWait);
+            }
+            catch (InterruptedException e)
+            {               
+            }
+            
+            long now = System.currentTimeMillis();
+            
+            toWait -= now - start;
+
+            start = now;                       
+         }
+         
+         boolean ret = received;
+         
+         received = false;
+         
+         return ret;
+      }
+   }
+
+   public void run()
+   {
+      //TODO - can we use a smaller buffer size?
+      final byte[] data = new byte[65535];
+      
+      final DatagramPacket packet = new DatagramPacket(data, data.length);
+      
+      try
+      {      
+         while (true)
+         {
+            if (!started)
+            {
+               return;
+            }
+            
+            try
+            {
+               socket.receive(packet);
+            }
+            catch (InterruptedIOException e)
+            {
+               if (!started)
+               {
+                  return;
+               }
+               else
+               {
+                  continue;
+               }
+            }
+            
+            log.info("Listener received packet");
+            
+            ByteArrayInputStream bis = new ByteArrayInputStream(data);
+            
+            ObjectInputStream ois = new ObjectInputStream(bis);
+            
+            int size = ois.readInt();
+            
+            synchronized (this)
+            {            
+               for (int i = 0; i < size; i++)
+               {
+                  TransportConfiguration connector = (TransportConfiguration)ois.readObject();
+                  
+                  connectors.put(connector, System.currentTimeMillis());
+               }
+            }
+            
+            packet.setLength(data.length);
+            
+            synchronized (waitLock)
+            {
+               received = true;
+               
+               waitLock.notify();
+            }
+            
+            callListeners();
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to receive datagram", e);
+      }
+   }
+   
+   public synchronized void registerListener(final DiscoveryListener listener)
+   {
+      this.listeners.add(listener);
+   }
+   
+   public synchronized void unregisterListener(final DiscoveryListener listener)
+   {
+      this.listeners.remove(listener);
+   }
+   
+   private synchronized void callListeners()
+   {
+      long now = System.currentTimeMillis();
+      
+      Iterator<Map.Entry<TransportConfiguration, Long>> iter = connectors.entrySet().iterator();
+      
+      //Weed out any expired connectors
+      
+      while (iter.hasNext())
+      {
+         Map.Entry<TransportConfiguration, Long> entry = iter.next();
+         
+         if (entry.getValue() + timeout <= now)
+         {
+            iter.remove();
+         }
+      }
+      
+      for (DiscoveryListener listener: listeners)
+      {
+         try
+         {
+            listener.connectorsChanged();
+         }
+         catch (Throwable t)
+         {
+            //Catch it so exception doesn't prevent other listeners from running
+            log.error("Failed to call discovery listener", t);
+         }
+      }
+   }
+}

Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java (from rev 5368, trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,288 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.Forwarder;
+import org.jboss.messaging.core.server.cluster.Transformer;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.Future;
+
+/**
+ * A ForwarderImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 12 Nov 2008 11:37:35
+ *
+ *
+ */
+public class ForwarderImpl implements Forwarder
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(ForwarderImpl.class);
+
+   // Attributes ----------------------------------------------------
+
+   private final Queue queue;
+
+   private Executor executor;
+
+   private volatile boolean busy;
+
+   private int maxBatchSize;
+
+   private long maxBatchTime;
+
+   private int count;
+
+   private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
+
+   private Transaction tx;
+
+   private final StorageManager storageManager;
+
+   private final PostOffice postOffice;
+
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+   
+   private final Transformer transformer;
+
+   private final ClientSessionFactory csf;
+   
+   private ClientSession session;
+
+   private ClientProducer producer;
+      
+   private volatile boolean started;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public ForwarderImpl(final Queue queue,
+                        final TransportConfiguration connectorConfig,
+                        final Executor executor,
+                        final int maxBatchSize,
+                        final long maxBatchTime,
+                        final StorageManager storageManager,
+                        final PostOffice postOffice,
+                        final HierarchicalRepository<QueueSettings> queueSettingsRepository,                        
+                        final Transformer transformer) throws Exception
+   {
+      this.queue = queue;
+
+      this.executor = executor;
+
+      this.maxBatchSize = maxBatchSize;
+
+      this.maxBatchTime = maxBatchTime;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.queueSettingsRepository = queueSettingsRepository;
+      
+      this.transformer = transformer;
+      
+      this.csf = new ClientSessionFactoryImpl(connectorConfig);      
+   }
+   
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      createTx();
+      
+      session = csf.createSession(false, false, false);
+
+      producer = session.createProducer(null);
+
+      queue.addConsumer(this);       
+      
+      started = true;
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      started = false;
+
+      queue.removeConsumer(this);
+
+      // Wait until all batches are complete
+
+      Future future = new Future();
+
+      executor.execute(future);
+
+      boolean ok = future.await(10000);
+
+      if (!ok)
+      {
+         log.warn("Timed out waiting for batch to be sent");
+      }
+      
+      session.close();
+      
+      started = false;
+   }
+   
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   // Consumer implementation ---------------------------------------
+
+   public HandleStatus handle(final MessageReference reference) throws Exception
+   {
+      if (busy)
+      {
+         return HandleStatus.BUSY;
+      }
+
+      synchronized (this)
+      {
+         if (!started)
+         {
+            return HandleStatus.BUSY;
+         }
+
+         refs.add(reference);
+
+         count++;
+
+         if (count == maxBatchSize)
+         {
+            busy = true;
+
+            executor.execute(new BatchSender());
+         }
+
+         return HandleStatus.HANDLED;
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void sendBatch()
+   {
+      try
+      {
+         synchronized (this)
+         {
+            // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
+
+            while (true)
+            {
+               MessageReference ref = refs.poll();
+
+               if (ref == null)
+               {
+                  break;
+               }
+
+               tx.addAcknowledgement(ref);
+
+               ServerMessage message = ref.getMessage();
+               
+               if (transformer != null)
+               {
+                  message = transformer.transform(message);
+               }
+
+               producer.send(message.getDestination(), message);
+            }
+
+            session.commit();
+
+            tx.commit();
+
+            createTx();
+
+            busy = false;
+
+            count = 0;
+         }
+
+         queue.deliverAsync(executor);
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to forward batch", e);
+
+         try
+         {
+            tx.rollback(queueSettingsRepository);
+         }
+         catch (Exception e2)
+         {
+            log.error("Failed to rollback", e2);
+         }
+      }
+   }
+
+   private void createTx()
+   {
+      tx = new TransactionImpl(storageManager, postOffice);
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private class BatchSender implements Runnable
+   {
+      public void run()
+      {
+         sendBatch();
+      }
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,292 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.DiscoveryListener;
+import org.jboss.messaging.core.server.cluster.Forwarder;
+import org.jboss.messaging.core.server.cluster.MessageFlow;
+import org.jboss.messaging.core.server.cluster.Transformer;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
+
+/**
+ * A MessageFlowImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 17 Nov 2008 15:55:49
+ *
+ *
+ */
+public class MessageFlowImpl implements DiscoveryListener, MessageFlow
+{
+   private static final Logger log = Logger.getLogger(MessageFlowImpl.class);
+
+   private final SimpleString name;
+
+   private final SimpleString address;
+
+   private final SimpleString filterString;
+
+   private final boolean fanout;
+
+   private final int maxBatchSize;
+
+   private final long maxBatchTime;
+
+   private final ExecutorFactory executorFactory;
+
+   private final StorageManager storageManager;
+
+   private final PostOffice postOffice;
+
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+   private final Transformer transformer;
+
+   private Map<TransportConfiguration, Forwarder> forwarders = new HashMap<TransportConfiguration, Forwarder>();
+
+   private final DiscoveryGroup discoveryGroup;
+
+   private volatile boolean started;
+
+   /*
+    * Constructor using static list of connectors
+    */
+   public MessageFlowImpl(final SimpleString name,
+                          final SimpleString address,
+                          final int maxBatchSize,
+                          final long maxBatchTime,
+                          final SimpleString filterString,
+                          final boolean fanout,
+                          final ExecutorFactory executorFactory,
+                          final StorageManager storageManager,
+                          final PostOffice postOffice,
+                          final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                          final Transformer transformer,
+                          final List<TransportConfiguration> connectors) throws Exception
+   {
+      this.name = name;
+
+      this.address = address;
+
+      this.maxBatchSize = maxBatchSize;
+
+      this.maxBatchTime = maxBatchTime;
+
+      this.filterString = filterString;
+
+      this.fanout = fanout;
+
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.queueSettingsRepository = queueSettingsRepository;
+
+      this.transformer = transformer;
+
+      this.discoveryGroup = null;
+
+      this.updateConnectors(connectors);
+   }
+
+   /*
+    * Constructor using discovery to get connectors
+    */
+   public MessageFlowImpl(final SimpleString name,
+                          final SimpleString address,
+                          final int maxBatchSize,
+                          final long maxBatchTime,
+                          final SimpleString filterString,
+                          final boolean fanout,
+                          final ExecutorFactory executorFactory,
+                          final StorageManager storageManager,
+                          final PostOffice postOffice,
+                          final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                          final Transformer transformer,
+                          final DiscoveryGroup discoveryGroup) throws Exception
+   {
+      this.name = name;
+
+      this.address = address;
+
+      this.maxBatchSize = maxBatchSize;
+
+      this.maxBatchTime = maxBatchTime;
+
+      this.filterString = filterString;
+
+      this.fanout = fanout;
+
+      this.executorFactory = executorFactory;
+
+      this.storageManager = storageManager;
+
+      this.postOffice = postOffice;
+
+      this.queueSettingsRepository = queueSettingsRepository;
+
+      this.transformer = transformer;
+
+      this.discoveryGroup = discoveryGroup;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      if (discoveryGroup != null)
+      {
+         updateConnectors(discoveryGroup.getConnectors());
+
+         discoveryGroup.registerListener(this);
+      }
+
+      started = true;
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      if (discoveryGroup != null)
+      {
+         discoveryGroup.unregisterListener(this);
+      }
+
+      for (Forwarder forwarder : forwarders.values())
+      {
+         forwarder.stop();
+      }
+
+      started = false;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   // DiscoveryListener implementation ------------------------------------------------------------------
+
+   public void connectorsChanged()
+   {
+      try
+      {
+         List<TransportConfiguration> connectors = discoveryGroup.getConnectors();
+
+         updateConnectors(connectors);
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to update connectors", e);
+      }
+   }
+
+   private void updateConnectors(final List<TransportConfiguration> connectors) throws Exception
+   {
+      Set<TransportConfiguration> connectorSet = new HashSet<TransportConfiguration>();
+
+      connectorSet.addAll(connectors);
+
+      Iterator<Map.Entry<TransportConfiguration, Forwarder>> iter = forwarders.entrySet().iterator();
+
+      while (iter.hasNext())
+      {
+         Map.Entry<TransportConfiguration, Forwarder> entry = iter.next();
+
+         if (!connectorSet.contains(entry.getKey()))
+         {
+            // Connector no longer there - we should remove and close it
+
+            entry.getValue().stop();
+
+            iter.remove();
+         }
+      }
+
+      for (TransportConfiguration connector : connectors)
+      {
+         if (!forwarders.containsKey(connector))
+         {
+            SimpleString queueName = new SimpleString("outflow." + name +
+                                                      "." +
+                                                      UUIDGenerator.getInstance().generateSimpleStringUUID());
+
+            Binding binding = postOffice.getBinding(queueName);
+
+            // TODO need to delete store and forward queues that are no longer in the config
+            // and also allow ability to change filterstring etc. while keeping the same name
+            if (binding == null)
+            {
+               Filter filter = filterString == null ? null : new FilterImpl(filterString);
+
+               binding = postOffice.addBinding(address, queueName, filter, true, false, fanout);
+            }
+
+            Forwarder forwarder = new ForwarderImpl(binding.getQueue(),
+                                                    connector,
+                                                    executorFactory.getExecutor(),
+                                                    maxBatchSize,
+                                                    maxBatchTime,
+                                                    storageManager,
+                                                    postOffice,
+                                                    queueSettingsRepository,
+                                                    transformer);
+
+            forwarders.put(connector, forwarder);
+
+            binding.getQueue().addConsumer(forwarder);
+
+            forwarder.start();
+         }
+      }
+   }
+
+}

Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,254 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.impl;
-
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Forwarder;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.Future;
-
-/**
- * A ForwarderImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 12 Nov 2008 11:37:35
- *
- *
- */
-public class ForwarderImpl implements Forwarder
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(ForwarderImpl.class);
-
-   // Attributes ----------------------------------------------------
-
-   private final Queue queue;
-
-   private Executor executor;
-
-   private volatile boolean busy;
-
-   private int maxBatchSize;
-
-   private long maxBatchTime;
-
-   private int count;
-
-   private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
-
-   private boolean closed;
-
-   private Transaction tx;
-
-   private final StorageManager storageManager;
-
-   private final PostOffice postOffice;
-
-   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
-   private final ClientSession session;
-
-   private final ClientProducer producer;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public ForwarderImpl(final Queue queue,
-                    final TransportConfiguration connectorConfig, final Executor executor, final int maxBatchSize,
-                    final long maxBatchTime,
-                    final StorageManager storageManager, final PostOffice postOffice,
-                    final HierarchicalRepository<QueueSettings> queueSettingsRepository)
-      throws Exception
-   {
-      this.queue = queue;
-      
-      this.executor = executor;
-      
-      this.maxBatchSize = maxBatchSize;
-      
-      this.maxBatchTime = maxBatchTime;
-      
-      this.storageManager = storageManager;
-      
-      this.postOffice = postOffice;
-      
-      this.queueSettingsRepository = queueSettingsRepository;
-      
-      createTx();
-      
-      ClientSessionFactory csf = new ClientSessionFactoryImpl(connectorConfig);
-      
-      session = csf.createSession(false, false, false);
-      
-      producer = session.createProducer(null);
-      
-      queue.addConsumer(this);
-   }
-
-   public synchronized void close() throws Exception
-   {
-      closed = true;
-      
-      queue.removeConsumer(this);
-
-      // Wait until all batches are complete
-
-      Future future = new Future();
-
-      executor.execute(future);
-
-      boolean ok = future.await(10000);
-
-      if (!ok)
-      {
-         log.warn("Timed out waiting for batch to be sent");
-      }
-   }
-
-   // Consumer implementation ---------------------------------------
-
-   public HandleStatus handle(final MessageReference reference) throws Exception
-   {
-      if (busy)
-      {         
-         return HandleStatus.BUSY;
-      }
-
-      synchronized (this)
-      {
-         if (closed)
-         {
-            return HandleStatus.BUSY;
-         }
-
-         refs.add(reference);
-
-         count++;
-
-         if (count == maxBatchSize)
-         {
-            busy = true;
-
-            executor.execute(new BatchSender());                        
-         }
-
-         return HandleStatus.HANDLED;
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private void sendBatch()
-   {
-      try
-      {
-         synchronized (this)
-         {
-            //TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
-   
-            while (true)
-            {
-               MessageReference ref = refs.poll();
-   
-               if (ref == null)
-               {
-                  break;
-               }
-   
-               tx.addAcknowledgement(ref);
-   
-               Message message = ref.getMessage();
-               
-               producer.send(message.getDestination(), message);
-            }
-   
-            session.commit();
-   
-            tx.commit();
-   
-            createTx();
-   
-            busy = false;
-            
-            count = 0;
-         }
-         
-         queue.deliverAsync(executor);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to forward batch", e);
-
-         try
-         {
-            tx.rollback(queueSettingsRepository);
-         }
-         catch (Exception e2)
-         {
-            log.error("Failed to rollback", e2);
-         }
-      }
-   }
-
-   private void createTx()
-   {      
-      tx = new TransactionImpl(storageManager, postOffice);
-   }
-
-   // Inner classes -------------------------------------------------
-
-   private class BatchSender implements Runnable
-   {
-      public void run()
-      {
-         sendBatch();
-      }
-   }
-
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -25,11 +25,11 @@
 
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -38,7 +38,6 @@
 import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
 import org.jboss.messaging.core.remoting.Channel;
@@ -54,11 +53,12 @@
 import org.jboss.messaging.core.security.Role;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
-import org.jboss.messaging.core.server.Forwarder;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.cluster.ClusterManager;
+import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -68,8 +68,6 @@
 import org.jboss.messaging.util.ExecutorFactory;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.OrderedExecutorFactory;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UUIDGenerator;
 import org.jboss.messaging.util.VersionLoader;
 
 /**
@@ -122,6 +120,8 @@
 
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
+   private ClusterManager clusterManager;
+
    // plugins
 
    private StorageManager storageManager;
@@ -218,7 +218,11 @@
 
       storeFactory.setPagingManager(pagingManager);
 
-      resourceManager = new ResourceManagerImpl((int) configuration.getTransactionTimeout()/1000, configuration.getTransactionTimeoutScanPeriod(), storageManager, postOffice, queueSettingsRepository);
+      resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
+                                                configuration.getTransactionTimeoutScanPeriod(),
+                                                storageManager,
+                                                postOffice,
+                                                queueSettingsRepository);
       postOffice = new PostOfficeImpl(storageManager,
                                       pagingManager,
                                       queueFactory,
@@ -269,9 +273,35 @@
                                                                   ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
       }
       remotingService.setMessagingServer(this);
-      
-      startOutflows();
 
+      if (configuration.isClustered())
+      {
+         clusterManager = new ClusterManagerImpl(executorFactory,
+                                                 storageManager,
+                                                 postOffice,
+                                                 queueSettingsRepository,
+                                                 scheduledExecutor);
+
+         clusterManager.start();
+         
+         //Deploy the cluster artifacts
+         
+         for (BroadcastGroupConfiguration config: configuration.getBroadcastGroupConfigurations())
+         {
+            clusterManager.deployBroadcastGroup(config);
+         }
+         
+         for (DiscoveryGroupConfiguration config: configuration.getDiscoveryGroupConfigurations())
+         {
+            clusterManager.deployDiscoveryGroup(config);
+         }
+         
+         for (MessageFlowConfiguration config: configuration.getMessageFlowConfigurations())
+         {
+            clusterManager.deployMessageFlow(config);
+         }
+      }
+            
       started = true;
    }
 
@@ -281,9 +311,12 @@
       {
          return;
       }
-      
-      stopOutflows();
 
+      if (clusterManager != null)
+      {
+         clusterManager.stop();
+      }
+
       asyncDeliveryPool.shutdown();
 
       try
@@ -310,7 +343,7 @@
       queueFactory = null;
       resourceManager = null;
       serverManagement = null;
-            
+
       started = false;
    }
 
@@ -407,7 +440,7 @@
    {
       return resourceManager;
    }
-   
+
    public Version getVersion()
    {
       return version;
@@ -464,53 +497,6 @@
          connection.freeze();
       }
    }
-   
-   private Set<Forwarder> forwarders = new HashSet<Forwarder>();
-   
-   private void startOutflows() throws Exception
-   {
-      Set<OutflowConfiguration> outflows = configuration.getOutflowConfigurations();
-      
-      for (OutflowConfiguration outflowConfig: outflows)
-      {
-         for (TransportConfiguration connectorConfig: outflowConfig.getConnectors())
-         {
-            SimpleString queueName = new SimpleString("outflow." + outflowConfig.getName() + "." + 
-                                                      UUIDGenerator.getInstance().generateSimpleStringUUID());
-            
-            Binding binding = postOffice.getBinding(queueName);
-                     
-            //TODO need to delete store and forward queues that are no longer in the config
-            //and also allow ability to change filterstring etc. while keeping the same name
-            if (binding == null)
-            {
-               SimpleString address = new SimpleString(outflowConfig.getAddress());
-               
-               SimpleString filterString = outflowConfig.getFilterString() == null ? null : new SimpleString(outflowConfig.getFilterString());
-               
-               Filter filter = filterString == null ? null : new FilterImpl(filterString);
-               
-               binding = postOffice.addBinding(address, queueName, filter, true, false, outflowConfig.isFanout());
-            }
-            
-            Forwarder forwarder = new ForwarderImpl(binding.getQueue(), connectorConfig, executorFactory.getExecutor(),
-                                                outflowConfig.getMaxBatchSize(), outflowConfig.getMaxBatchTime(),
-                                                storageManager, postOffice, queueSettingsRepository);
-            
-            forwarders.add(forwarder);
-            
-            binding.getQueue().addConsumer(forwarder);
-         }
-      }
-   }
-   
-   private void stopOutflows() throws Exception
-   {
-      for (Forwarder forwarder: forwarders)
-      {
-         forwarder.close();
-      }
-   }
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
                                                          final String name,
@@ -569,7 +555,7 @@
                                                      final boolean autoCommitAcks,
                                                      final boolean xa,
                                                      final int sendWindowSize) throws Exception
-   {      
+   {
       checkActivate(connection);
 
       return doCreateSession(name,
@@ -693,7 +679,7 @@
                                                               executorFactory.getExecutor(),
                                                               channel,
                                                               managementService,
-                                                              this,                                                    
+                                                              this,
                                                               configuration.getManagementAddress());
 
       sessions.put(name, session);
@@ -703,7 +689,7 @@
       channel.setHandler(handler);
 
       connection.addFailureListener(session);
-      
+
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
 

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,223 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * A OutflowBatchSizeTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 15 Nov 2008 09:06:55
- *
- *
- */
-public class OutflowBatchSizeTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private MessagingService service0;
-
-   private MessagingService service1;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testBatchSize() throws Exception
-   {
-      Configuration service0Conf = new ConfigurationImpl();
-      service0Conf.setSecurityEnabled(false);
-      Map<String, Object> service0Params = new HashMap<String, Object>();
-      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
-      service0Conf.getAcceptorConfigurations()
-                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                  service0Params));
-
-      Configuration service1Conf = new ConfigurationImpl();
-      service1Conf.setSecurityEnabled(false);
-      Map<String, Object> service1Params = new HashMap<String, Object>();
-      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                                              service1Params));
-      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
-      service1.start();
-
-      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
-      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service1Params);
-      connectors.add(server1tc);
-      
-      final SimpleString address1 = new SimpleString("testaddress");
-                       
-      final int batchSize = 10;
- 
-      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, connectors);
-      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
-      ofconfigs.add(ofconfig);
-      service0Conf.setOutFlowConfigurations(ofconfigs);
-
-      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
-      service0.start();
-      
-      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service0Params);
-      
-      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-      
-      ClientSession session0 = csf0.createSession(false, true, true);
-      
-      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-      
-      ClientSession session1 = csf1.createSession(false, true, true);
-      
-      session0.createQueue(address1, address1, null, false, false, true);
-
-      session1.createQueue(address1, address1, null, false, false, true);  
-      
-      ClientProducer prod0_1 = session0.createProducer(address1);
-         
-      ClientConsumer cons0_1 = session0.createConsumer(address1);
-      
-      ClientConsumer cons1_1 = session1.createConsumer(address1);
-
-      session0.start();
-      
-      session1.start();
-      
-      final SimpleString propKey = new SimpleString("testkey");
-      
-      for (int j = 0; j < 10; j++)
-      {
-          
-         for (int i = 0; i < batchSize - 1; i++)
-         {      
-            ClientMessage message = session0.createClientMessage(false);
-            message.putIntProperty(propKey, i);        
-            message.getBody().flip();
-                 
-            prod0_1.send(message);
-         }
-              
-         for (int i = 0; i < batchSize - 1; i++)
-         {
-            ClientMessage rmessage1 = cons0_1.receive(1000);
-            
-            assertNotNull(rmessage1);
-            
-            assertEquals(i, rmessage1.getProperty(propKey));         
-         }
-         
-         ClientMessage rmessage1 = cons1_1.receive(250);
-         
-         assertNull(rmessage1);
-         
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, batchSize - 1);        
-         message.getBody().flip();
-              
-         prod0_1.send(message);
-         
-         rmessage1 = cons0_1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(batchSize - 1, rmessage1.getProperty(propKey));  
-         
-         for (int i = 0; i < batchSize; i++)
-         {
-            rmessage1 = cons1_1.receive(1000);
-            
-            assertNotNull(rmessage1);
-            
-            assertEquals(i, rmessage1.getProperty(propKey));         
-         }
-      }
-            
-      session0.close();
-      
-      session1.close();
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      service0.stop();
-      
-      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
-      service1.stop();
-
-      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
-      assertEquals(0, InVMRegistry.instance.size());
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}
-
-

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A OutflowBatchSizeTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 09:06:55
+ *
+ *
+ */
+public class OutflowBatchSizeTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service0;
+
+   private MessagingService service1;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testBatchSize() throws Exception
+   {
+      Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setClustered(true);
+      service0Conf.setSecurityEnabled(false);
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+      service0Conf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  service0Params));
+
+      Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setClustered(true);
+      service1Conf.setSecurityEnabled(false);
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                                              service1Params));
+      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                       
+      final int batchSize = 10;
+ 
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0Conf.setMessageFlowConfigurations(ofconfigs);
+
+      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+
+      session1.createQueue(address1, address1, null, false, false, true);  
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+         
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+      session0.start();
+      
+      session1.start();
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int j = 0; j < 10; j++)
+      {
+          
+         for (int i = 0; i < batchSize - 1; i++)
+         {      
+            ClientMessage message = session0.createClientMessage(false);
+            message.putIntProperty(propKey, i);        
+            message.getBody().flip();
+                 
+            prod0_1.send(message);
+         }
+              
+         for (int i = 0; i < batchSize - 1; i++)
+         {
+            ClientMessage rmessage1 = cons0_1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));         
+         }
+         
+         ClientMessage rmessage1 = cons1_1.receive(250);
+         
+         assertNull(rmessage1);
+         
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, batchSize - 1);        
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+         
+         rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(batchSize - 1, rmessage1.getProperty(propKey));  
+         
+         for (int i = 0; i < batchSize; i++)
+         {
+            rmessage1 = cons1_1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));         
+         }
+      }
+            
+      session0.close();
+      
+      session1.close();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service0.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+      service1.stop();
+
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -38,8 +38,8 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -79,6 +79,7 @@
    public void testWithWildcard() throws Exception
    {
       Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setClustered(true);
       service0Conf.setSecurityEnabled(false);
       Map<String, Object> service0Params = new HashMap<String, Object>();
       service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -87,6 +88,7 @@
                                                   service0Params));
 
       Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setClustered(true);
       service1Conf.setSecurityEnabled(false);
       Map<String, Object> service1Params = new HashMap<String, Object>();
       service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -105,10 +107,10 @@
                  
       final String filter = "selectorkey='ORANGES'";
       
-      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, connectors);
-      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);
-      service0Conf.setOutFlowConfigurations(ofconfigs);
+      service0Conf.setMessageFlowConfigurations(ofconfigs);
 
       service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
       service0.start();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -38,8 +38,8 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -79,6 +79,7 @@
    public void testWithWildcard() throws Exception
    {
       Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setClustered(true);
       service0Conf.setSecurityEnabled(false);
       Map<String, Object> service0Params = new HashMap<String, Object>();
       service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -87,6 +88,7 @@
                                                   service0Params));
 
       Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setClustered(true);
       service1Conf.setSecurityEnabled(false);
       Map<String, Object> service1Params = new HashMap<String, Object>();
       service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -112,10 +114,10 @@
       final SimpleString match1 = new SimpleString("cheese.#");
       
             
-      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", match1.toString(), null, true, 1, 0, connectors);
-      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", match1.toString(), null, true, 1, 0, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);
-      service0Conf.setOutFlowConfigurations(ofconfigs);
+      service0Conf.setMessageFlowConfigurations(ofconfigs);
 
       service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
       service0.start();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java	2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java	2008-11-18 11:58:28 UTC (rev 5381)
@@ -38,8 +38,8 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -79,6 +79,7 @@
    public void testSimpleOutflowFanout() throws Exception
    {
       Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setClustered(true);
       service0Conf.setSecurityEnabled(false);
       Map<String, Object> service0Params = new HashMap<String, Object>();
       service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -87,6 +88,7 @@
                                                   service0Params));
 
       Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setClustered(true);
       service1Conf.setSecurityEnabled(false);
       Map<String, Object> service1Params = new HashMap<String, Object>();
       service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -103,10 +105,10 @@
       
       final SimpleString testAddress = new SimpleString("testaddress");
 
-      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", testAddress.toString(), null, true, 1, 0, connectors);
-      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, true, 1, 0, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);
-      service0Conf.setOutFlowConfigurations(ofconfigs);
+      service0Conf.setMessageFlowConfigurations(ofconfigs);
 
       service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
       service0.start();
@@ -168,6 +170,7 @@
    public void testSimpleOutflowRoundRobin() throws Exception
    {
       Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setClustered(true);
       service0Conf.setSecurityEnabled(false);
       Map<String, Object> service0Params = new HashMap<String, Object>();
       service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -176,6 +179,7 @@
                                                   service0Params));
 
       Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setClustered(true);
       service1Conf.setSecurityEnabled(false);
       Map<String, Object> service1Params = new HashMap<String, Object>();
       service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -192,10 +196,10 @@
       
       final SimpleString testAddress = new SimpleString("testaddress");
 
-      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, connectors);
-      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);
-      service0Conf.setOutFlowConfigurations(ofconfigs);
+      service0Conf.setMessageFlowConfigurations(ofconfigs);
 
       service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
       service0.start();




More information about the jboss-cvs-commits mailing list