[hornetq-commits] JBoss hornetq SVN: r8126 - in branches/hornetq_grouping: src/main/org/hornetq/core/config/impl and 15 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 19 07:30:50 EDT 2009


Author: ataylor
Date: 2009-10-19 07:30:49 -0400 (Mon, 19 Oct 2009)
New Revision: 8126

Added:
   branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
Modified:
   branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
some refactoring

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -130,9 +130,9 @@
 
    void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
 
-   List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations();
+   GroupingHandlerConfiguration getGroupingHandlerConfiguration();
 
-   void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration);
+   void setGroupingHandlerConfiguration(GroupingHandlerConfiguration groupingHandlerConfiguration);
 
    List<BridgeConfiguration> getBridgeConfigurations();
 
@@ -309,5 +309,4 @@
    void setMessageExpiryThreadPriority(int messageExpiryThreadPriority);
 
 
-
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -230,7 +230,7 @@
 
    protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
 
-   protected List<GroupingHandlerConfiguration> groupingHandlerConfiguration = new ArrayList<GroupingHandlerConfiguration>();
+   protected GroupingHandlerConfiguration groupingHandlerConfiguration;
 
    // Paging related attributes ------------------------------------------------------------
 
@@ -484,12 +484,12 @@
       this.backupConnectorName = backupConnectorName;
    }
 
-   public List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations()
+   public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
    {
       return groupingHandlerConfiguration;
    }
 
-   public void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration)
+   public void setGroupingHandlerConfiguration(GroupingHandlerConfiguration groupingHandlerConfiguration)
    {
       this.groupingHandlerConfiguration = groupingHandlerConfiguration;
    }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -577,19 +577,16 @@
    }
 
    private void parseGroupingHandlerConfiguration(final Element node)
-      {
-         String name = node.getAttribute("name");
-         String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
-         String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
-         Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
-         GroupingHandlerConfiguration arbitratorConfiguration =
-               new GroupingHandlerConfiguration(new SimpleString(name),
-                                           type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
-                                           new SimpleString(address),
-                                           timeout);
-         System.out.println("arbitratorConfiguration = " + arbitratorConfiguration);
-         groupingHandlerConfiguration.add(arbitratorConfiguration);
-      }
+   {
+      String name = node.getAttribute("name");
+      String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+      String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+      Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
+      groupingHandlerConfiguration = new GroupingHandlerConfiguration(new SimpleString(name),
+                                  type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+                                  new SimpleString(address),
+                                  timeout);
+   }
 
 
    private void parseBridgeConfiguration(final Element brNode)

Added: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.persistence;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Oct 18, 2009
+ */
+public interface GroupingInfo
+{
+   public SimpleString getClusterName();
+
+   public SimpleString getGroupId();
+
+   public long getId();
+}

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -26,6 +26,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.SimpleString;
@@ -111,5 +112,11 @@
    
    void deleteQueueBinding(long queueBindingID) throws Exception;
    
-   void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos) throws Exception;
+   void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception;
+
+   //grouping relateed operations
+   void addGrouping(GroupBinding groupBinding) throws Exception;
+
+
+   void deleteGrouping(GroupBinding groupBinding) throws Exception;
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -50,6 +50,7 @@
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -58,6 +59,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
@@ -85,6 +87,8 @@
 
    private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
 
+   //grouping journal record type
+   public static final byte GROUP_RECORD = 41;
    // Bindings journal record type
 
    public static final byte QUEUE_BINDING_RECORD = 21;
@@ -988,7 +992,19 @@
          resourceManager.putTransaction(xid, tx);
       }
    }
+   //grouping handler operations
+   public void addGrouping(GroupBinding groupBinding) throws Exception
+   {
+      GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName());
 
+      bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD, groupingEncoding, true);
+   }
+
+   public void deleteGrouping(GroupBinding groupBinding) throws Exception
+   {
+      bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+   }
+
    // Bindings operations
 
    public void addQueueBinding(final Binding binding) throws Exception
@@ -1011,7 +1027,7 @@
       bindingsJournal.appendDeleteRecord(queueBindingID, true);
    }
 
-   public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+   public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -1045,6 +1061,13 @@
 
             persistentID = encoding.uuid;
          }
+         else if(rec == GROUP_RECORD)
+         {
+            GroupingEncoding encoding = new GroupingEncoding();
+            encoding.decode(buffer);
+            encoding.setId(id);
+            groupingInfos.add(encoding);
+         }
          else if (rec == BatchingIDGenerator.ID_COUNTER_RECORD)
          {
             idGenerator.loadState(record.id, buffer);
@@ -1259,6 +1282,63 @@
       }
    }
 
+   private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+   {
+      long id;
+
+      SimpleString groupId;
+
+      SimpleString clusterName;
+
+      public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+      {
+         this.id = id;
+         this.groupId = groupId;
+         this.clusterName = clusterName;
+      }
+
+      public GroupingEncoding()
+      {
+      }
+
+      public int getEncodeSize()
+      {
+         return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
+      }
+
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeSimpleString(groupId);
+         buffer.writeSimpleString(clusterName);
+      }
+
+      public void decode(HornetQBuffer buffer)
+      {
+         groupId = buffer.readSimpleString();
+         clusterName = buffer.readSimpleString();
+      }
+
+      public long getId()
+      {
+         return id;
+      }
+
+      public void setId(long id)
+      {
+         this.id = id;
+      }
+
+      public SimpleString getGroupId()
+      {
+         return groupId;
+      }
+
+      public SimpleString getClusterName()
+      {
+         return clusterName;
+      }
+   }
+
    private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
    {
       long id;

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -24,11 +24,13 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.SimpleString;
@@ -74,7 +76,7 @@
    {
    }
 
-   public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+   public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
    {
 
    }
@@ -247,4 +249,14 @@
    {
    }
 
+   public void addGrouping(GroupBinding groupBinding) throws Exception
+   {
+      //To change body of implemented methods use File | Settings | File Templates.
+   }
+
+   public void deleteGrouping(GroupBinding groupBinding) throws Exception
+   {
+      //To change body of implemented methods use File | Settings | File Templates.
+   }
+
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -64,8 +64,4 @@
    void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
    
    Object getNotificationLock();
-
-   void setGroupingHandler(GroupingHandler groupingHandler);
-
-   GroupingHandler getGroupingHandler();
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -28,7 +28,6 @@
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
-import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Bindable;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
@@ -60,11 +59,11 @@
 
    private volatile boolean routeWhenNoConsumers;
 
-   private final PostOffice postOffice;
+   private final GroupingHandler groupingHandler;
 
-   public BindingsImpl(PostOffice postOffice)
+   public BindingsImpl(GroupingHandler groupingHandler)
    {
-      this.postOffice = postOffice;
+      this.groupingHandler = groupingHandler;
    }
 
    public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
@@ -278,15 +277,13 @@
 
       if (!routed)
       {
-         GroupingHandler groupingGroupingHandler = postOffice.getGroupingHandler();
-
          if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
          {
             routed = routeFromCluster(message, tx);
          }
-         else if (groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
+         else if (groupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
          {
-            routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
+            routeUsingStrictOrdering(message, tx, groupingHandler);
          }
          else
          {
@@ -462,12 +459,12 @@
 
             resp = groupingGroupingHandler.propose(new Proposal(fullID, chosen.getClusterName()));
 
-            if (resp.getAlternative() != null)
+            if (resp.getAlternativeClusterName() != null)
             {
                chosen = null;
                for (Binding binding : bindings)
                {
-                  if (binding.getClusterName().equals(resp.getAlternative()))
+                  if (binding.getClusterName().equals(resp.getAlternativeClusterName()))
                   {
                      chosen = binding;
                      break;
@@ -483,7 +480,7 @@
             }
             else
             {
-               throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+               throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
             }
          }
          else
@@ -491,7 +488,7 @@
             Binding chosen = null;
             for (Binding binding : bindings)
             {
-               if (binding.getClusterName().equals(resp.getChosen()))
+               if (binding.getClusterName().equals(resp.getChosenClusterName()))
                {
                   chosen = binding;
                   break;
@@ -505,7 +502,7 @@
             }
             else
             {
-               throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+               throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
             }
          }
 

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -111,7 +111,7 @@
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
-   private GroupingHandler groupingGroupingHandler;
+   private final HornetQServer server;
 
    public PostOfficeImpl(final HornetQServer server,
                          final StorageManager storageManager,
@@ -124,7 +124,7 @@
                          final int idCacheSize,
                          final boolean persistIDCache,
                          final ExecutorFactory orderedExecutorFactory,
-                         HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                         final HierarchicalRepository<AddressSettings> addressSettingsRepository)
 
    {
       this.storageManager = storageManager;
@@ -155,6 +155,8 @@
       this.redistributorExecutorFactory = orderedExecutorFactory;
 
       this.addressSettingsRepository = addressSettingsRepository;
+
+      this.server = server;
    }
 
    // HornetQComponent implementation ---------------------------------------
@@ -694,18 +696,6 @@
       return notificationLock;
    }
 
-
-   public void setGroupingHandler(GroupingHandler groupingHandler)
-   {
-      groupingGroupingHandler = groupingHandler;
-      managementService.addNotificationListener(groupingGroupingHandler);
-   }
-
-   public GroupingHandler getGroupingHandler()
-   {
-      return groupingGroupingHandler;
-   }
-
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
    {
       // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
@@ -1047,6 +1037,6 @@
 
    public Bindings createBindings()
    {
-      return new BindingsImpl(this);
+      return new BindingsImpl(server.getGroupingHandler());
    }
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -15,6 +15,7 @@
 
 import java.util.List;
 import java.util.Set;
+import java.nio.channels.DatagramChannel;
 
 import javax.management.MBeanServer;
 
@@ -30,6 +31,7 @@
 import org.hornetq.core.security.HornetQSecurityManager;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.ResourceManager;
@@ -126,4 +128,8 @@
    void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
 
    ExecutorFactory getExecutorFactory();
+
+   void setGroupingHandler(GroupingHandler groupingHandler);
+
+   GroupingHandler getGroupingHandler();
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -554,10 +554,10 @@
          }
          SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
          Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
-         Response response = postOffice.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
+         Response response = server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
          if(response != null)
          {
-            postOffice.getGroupingHandler().send(response, 0);
+            server.getGroupingHandler().send(response, 0);
          }
       }
 
@@ -572,8 +572,8 @@
          SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
          Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
          Response response = new Response(type, val, alt);
-         postOffice.getGroupingHandler().proposed(response);
-         postOffice.getGroupingHandler().send(response, hops + 1);
+         server.getGroupingHandler().proposed(response);
+         server.getGroupingHandler().send(response, hops + 1);
       }
       
       private synchronized void clearBindings() throws Exception

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -37,7 +37,6 @@
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -149,11 +148,6 @@
       {
          deployClusterConnection(config);
       }
-      
-      for (GroupingHandlerConfiguration config : configuration.getGroupingHandlerConfigurations())
-      {
-         deployGroupingHandlerConfigurations(config);
-      }
 
       started = true;
    }
@@ -486,21 +480,6 @@
       bridge.start();
    }
 
-   private synchronized void deployGroupingHandlerConfigurations(final GroupingHandlerConfiguration config) throws Exception
-   {
-      GroupingHandler groupingHandler;
-      if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
-      {
-         groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress());
-      }
-      else
-      {
-         groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
-      }
-      log.info("deploying grouping handler: " + groupingHandler);
-      postOffice.setGroupingHandler(groupingHandler);
-   }
-
    private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
    {
       if (config.getName() == null)

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -15,6 +15,7 @@
 import org.hornetq.utils.SimpleString;
 import org.hornetq.core.server.group.impl.Proposal;
 import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.management.NotificationListener;
 import org.hornetq.core.management.Notification;
 
@@ -33,5 +34,5 @@
 
    Response receive(Proposal proposal, int distance) throws Exception;
 
-   void onNotification(Notification notification);
+   void addGroupBinding(GroupBinding groupBinding);
 }

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Oct 19, 2009
+ */
+public class GroupBinding
+{
+   private long id;
+
+   private final SimpleString groupId;
+
+   private final SimpleString clusterName;
+
+   public GroupBinding(SimpleString groupId, SimpleString clusterName)
+   {
+      this.groupId = groupId;
+      this.clusterName = clusterName;
+   }
+
+   public GroupBinding(long id, SimpleString groupId, SimpleString clusterName)
+   {
+      this.id = id;
+      this.groupId = groupId;
+      this.clusterName = clusterName;
+   }
+
+   public long getId()
+   {
+      return id;
+   }
+
+   public void setId(long id)
+   {
+      this.id = id;
+   }
+
+   public SimpleString getGroupId()
+   {
+      return groupId;
+   }
+
+   public SimpleString getClusterName()
+   {
+      return clusterName;
+   }
+}

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -19,13 +19,11 @@
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.ConcurrentHashSet;
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 
 /**
@@ -35,21 +33,23 @@
 {
    private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
 
-   private ConcurrentHashMap<SimpleString, SimpleString> map = new ConcurrentHashMap<SimpleString, SimpleString>();
+   private ConcurrentHashMap<SimpleString, GroupBinding> map = new ConcurrentHashMap<SimpleString, GroupBinding>();
 
-   private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+   private HashMap<SimpleString, GroupBinding> groupMap = new HashMap<SimpleString, GroupBinding>();
 
    private final SimpleString name;
 
    private final ManagementService managementService;
 
    private SimpleString address;
+   private StorageManager storageManager;
 
-   public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+   public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, StorageManager storageManager)
    {
       this.managementService = managementService;
       this.name = name;
       this.address = address;
+      this.storageManager = storageManager;
    }
 
    public SimpleString getName()
@@ -60,20 +60,23 @@
 
    public Response propose(Proposal proposal) throws Exception
    {
-      if(proposal.getProposal() == null)
+      if(proposal.getClusterName() == null)
       {
-         SimpleString original = map.get(proposal.getProposalType());
-         return original == null?null:new Response(proposal.getProposalType(), original);
+         GroupBinding original = map.get(proposal.getGroupId());
+         return original == null?null:new Response(proposal.getGroupId(), original.getClusterName());
       }
-      Response response = new Response(proposal.getProposalType(), proposal.getProposal());
-      if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+      GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
+      if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
       {
-         groupMap.put(response.getChosen(), response.getResponseType());
-         return response;
+         groupBinding.setId(storageManager.generateUniqueID());
+         groupMap.put(groupBinding.getClusterName(), groupBinding);
+         storageManager.addGrouping(groupBinding);
+         return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
       }
       else
       {
-         return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+         groupBinding = map.get(proposal.getGroupId());
+         return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
       }
    }
 
@@ -84,9 +87,9 @@
    public void send(Response response, int distance) throws Exception
    {
       TypedProperties props = new TypedProperties();
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getOriginal());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternative());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getGroupId());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getClusterName());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternativeClusterName());
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -99,16 +102,30 @@
       return propose(proposal);
    }
 
+   public void addGroupBinding(GroupBinding groupBinding)
+   {
+      map.put(groupBinding.getGroupId(), groupBinding);
+      groupMap.put(groupBinding.getClusterName(), groupBinding);
+   }
+
    public void onNotification(Notification notification)
    {
       if(notification.getType() == NotificationType.BINDING_REMOVED)
       {
          SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-         SimpleString val = groupMap.get(clusterName);
+         GroupBinding val = groupMap.get(clusterName);
          if(val != null)
          {
             groupMap.remove(clusterName);
-            map.remove(val);
+            map.remove(val.getGroupId());
+            try
+            {
+               storageManager.deleteGrouping(val);
+            }
+            catch (Exception e)
+            {
+               log.warn("Unable to delete group binding info " + val.getGroupId(), e);
+            }
          }
       }
    }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -19,26 +19,26 @@
  */
 public class Proposal
 {
-   private final SimpleString proposalType;
-   private final SimpleString proposal;
+   private final SimpleString groupId;
+   private final SimpleString clusterName;
 
    public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
    public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
 
-   public Proposal(SimpleString proposalType, SimpleString proposal)
+   public Proposal(SimpleString groupId, SimpleString clusterName)
    {
-      this.proposal = proposal;
-      this.proposalType = proposalType;
+      this.clusterName = clusterName;
+      this.groupId = groupId;
    }
 
-   public SimpleString getProposalType()
+   public SimpleString getGroupId()
    {
-      return proposalType;
+      return groupId;
    }
 
-   public SimpleString getProposal()
+   public SimpleString getClusterName()
    {
-      return proposal;
+      return clusterName;
    }
 }
 

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -67,12 +67,12 @@
 
    public Response propose(final Proposal proposal) throws Exception
    {
-      Response response = responses.get(proposal.getProposalType());
+      Response response = responses.get(proposal.getGroupId());
       if( response != null)
       {
          return response;
       }
-      if (proposal.getProposal() == null)
+      if (proposal.getClusterName() == null)
       {
          return null;
       }
@@ -80,15 +80,15 @@
       {
          lock.lock();
          TypedProperties props = new TypedProperties();
-         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
-         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getGroupId());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getClusterName());
          props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
          props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
          Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
          managementService.sendNotification(notification);
          sendCondition.await(timeout, TimeUnit.MILLISECONDS);
-         response = responses.get(proposal.getProposalType());
+         response = responses.get(proposal.getGroupId());
       }
       finally
       {
@@ -96,7 +96,7 @@
       }
       if(response == null)
       {
-         throw new IllegalStateException("no response received from group handler for " + proposal.getProposalType());
+         throw new IllegalStateException("no response received from group handler for " + proposal.getGroupId());
       }
       return response;
    }
@@ -106,8 +106,8 @@
       try
       {
          lock.lock();
-         responses.put(response.getResponseType(), response);
-         groupMap.put(response.getChosen(), response.getResponseType());
+         responses.put(response.getGroupId(), response);
+         groupMap.put(response.getChosenClusterName(), response.getGroupId());
          sendCondition.signal();
       }
       finally
@@ -119,8 +119,8 @@
    public Response receive(Proposal proposal, int distance) throws Exception
    {
       TypedProperties props = new TypedProperties();
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getGroupId());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getClusterName());
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -133,6 +133,11 @@
    {
    }
 
+   public void addGroupBinding(GroupBinding groupBinding)
+   {
+      
+   }
+
    public void onNotification(Notification notification)
    {
       if(notification.getType() == NotificationType.BINDING_REMOVED)

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -21,23 +21,23 @@
 {
    private final boolean accepted;
 
-   private final SimpleString original;
+   private final SimpleString clusterName;
 
-   private final SimpleString alternative;
+   private final SimpleString alternativeClusterName;
 
-   private SimpleString responseType;
+   private SimpleString groupId;
 
-   public Response(SimpleString responseType, SimpleString original)
+   public Response(SimpleString groupId, SimpleString clusterName)
    {
-      this(responseType, original, null);
+      this(groupId, clusterName, null);
    }
 
-   public Response(SimpleString responseType, SimpleString original, SimpleString alternative)
+   public Response(SimpleString groupId, SimpleString clusterName, SimpleString alternativeClusterName)
    {
-      this.responseType = responseType;
-      this.accepted = alternative == null;
-      this.original = original;
-      this.alternative = alternative;
+      this.groupId = groupId;
+      this.accepted = alternativeClusterName == null;
+      this.clusterName = clusterName;
+      this.alternativeClusterName = alternativeClusterName;
    }
 
    public boolean isAccepted()
@@ -45,29 +45,29 @@
       return accepted;
    }
 
-   public SimpleString getOriginal()
+   public SimpleString getClusterName()
    {
-      return original;
+      return clusterName;
    }
 
-   public SimpleString getAlternative()
+   public SimpleString getAlternativeClusterName()
    {
-      return alternative;
+      return alternativeClusterName;
    }
 
-   public SimpleString getChosen()
+   public SimpleString getChosenClusterName()
    {
-      return alternative != null?alternative:original;
+      return alternativeClusterName != null? alternativeClusterName : clusterName;
    }
 
    @Override
    public String toString()
    {
-      return "accepted = " + accepted + " original = " + original + " alternative = " + alternative;
+      return "accepted = " + accepted + " clusterName = " + clusterName + " alternativeClusterName = " + alternativeClusterName;
    }
 
-   public SimpleString getResponseType()
+   public SimpleString getGroupId()
    {
-      return responseType;
+      return groupId;
    }
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -57,6 +57,7 @@
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
@@ -84,6 +85,8 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.*;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.Transformer;
 import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -194,6 +197,8 @@
 
    private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
 
+   private GroupingHandler groupingHandler;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -309,6 +314,11 @@
          clusterManager.stop();
       }
 
+      if(groupingHandler != null)
+      {
+         managementService.removeNotificationListener(groupingHandler);
+         groupingHandler = null;
+      }
       // Need to flush all sessions to make sure all confirmations get sent back to client
 
       for (ServerSession session : sessions.values())
@@ -864,6 +874,17 @@
       return executorFactory;
    }
 
+   public void setGroupingHandler(GroupingHandler groupingHandler)
+   {
+      this.groupingHandler = groupingHandler;
+     // managementService.addNotificationListener(groupingHandler);
+   }
+
+   public GroupingHandler getGroupingHandler()
+   {
+      return groupingHandler;
+   }
+
    // Public
    // ---------------------------------------------------------------------------------------
 
@@ -1114,6 +1135,8 @@
          clusterManager.start();
       }
 
+      deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
+
       if (deploymentManager != null)
       {
          deploymentManager.start();
@@ -1154,8 +1177,10 @@
    {
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
 
-      storageManager.loadBindingJournal(queueBindingInfos);
+      List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
 
+      storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+
       // Set the node id - must be before we load the queues into the postoffice, but after we load the journal
       setNodeID();
 
@@ -1187,6 +1212,14 @@
          managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
       }
 
+      for (GroupingInfo groupingInfo : groupingInfos)
+      {
+         if(groupingHandler != null)
+         {
+            groupingHandler.addGroupBinding(new GroupBinding(groupingInfo.getId(), groupingInfo.getGroupId(), groupingInfo.getClusterName()));
+         }
+      }
+
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
       storageManager.loadMessageJournal(pagingManager, resourceManager, queues, duplicateIDMap);
@@ -1348,6 +1381,25 @@
       }
    }
 
+   private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
+   {
+      if (config != null)
+      {
+         GroupingHandler groupingHandler;
+         if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+         {
+            groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress(), getStorageManager());
+         }
+         else
+         {
+            groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
+         }
+         log.info("deploying grouping handler: " + groupingHandler);
+         this.groupingHandler = groupingHandler;
+         managementService.addNotificationListener(groupingHandler);
+      }
+   }
+
    private Transformer instantiateTransformer(final String transformerClassName)
    {
       Transformer transformer = null;

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -45,8 +45,6 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.group.impl.LocalGroupingHandler;
-import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
@@ -60,25 +58,23 @@
  * A ClusterTestBase
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 30 Jan 2009 11:29:43
- *
- *
+ *         <p/>
+ *         Created 30 Jan 2009 11:29:43
  */
 public class ClusterTestBase extends ServiceTestBase
 {
    private static final Logger log = Logger.getLogger(ClusterTestBase.class);
 
    private static final int[] PORTS = {TransportConstants.DEFAULT_PORT,
-                                       TransportConstants.DEFAULT_PORT + 1,
-                                       TransportConstants.DEFAULT_PORT + 2,
-                                       TransportConstants.DEFAULT_PORT + 3,
-                                       TransportConstants.DEFAULT_PORT + 4,
-                                       TransportConstants.DEFAULT_PORT + 5,
-                                       TransportConstants.DEFAULT_PORT + 6,
-                                       TransportConstants.DEFAULT_PORT + 7,
-                                       TransportConstants.DEFAULT_PORT + 8,
-                                       TransportConstants.DEFAULT_PORT + 9,
+         TransportConstants.DEFAULT_PORT + 1,
+         TransportConstants.DEFAULT_PORT + 2,
+         TransportConstants.DEFAULT_PORT + 3,
+         TransportConstants.DEFAULT_PORT + 4,
+         TransportConstants.DEFAULT_PORT + 5,
+         TransportConstants.DEFAULT_PORT + 6,
+         TransportConstants.DEFAULT_PORT + 7,
+         TransportConstants.DEFAULT_PORT + 8,
+         TransportConstants.DEFAULT_PORT + 9,
    };
 
    private static final long WAIT_TIMEOUT = 10000;
@@ -89,21 +85,21 @@
       super.setUp();
 
       checkFreePort(PORTS);
-      
+
       clearData();
    }
-   
+
    @Override
    protected void tearDown() throws Exception
    {
       checkFreePort(PORTS);
-      
+
       servers = null;
 
       sfs = null;
-      
+
       consumers = null;
-      
+
       consumers = new ConsumerHolder[MAX_CONSUMERS];
 
       super.tearDown();
@@ -173,8 +169,8 @@
       //System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
       throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
-                                      ", expecting = " +
-                                      count);
+            ", expecting = " +
+            count);
    }
 
    protected void waitForBindings(int node,
@@ -219,7 +215,7 @@
          {
             if ((binding instanceof LocalQueueBinding && local) || (binding instanceof RemoteQueueBinding && !local))
             {
-               QueueBinding qBinding = (QueueBinding)binding;
+               QueueBinding qBinding = (QueueBinding) binding;
 
                bindingCount++;
 
@@ -242,8 +238,8 @@
       // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
       String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
-                   ", totConsumers = " +
-                   totConsumers;
+            ", totConsumers = " +
+            totConsumers;
 
       log.error(msg);
 
@@ -424,7 +420,7 @@
    {
       sendInRange(node, address, 0, numMessages, durable, key, val);
    }
-   
+
    protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean durable, SimpleString key, SimpleString val) throws Exception
    {
       ClientSessionFactory sf = this.sfs[node];
@@ -450,28 +446,20 @@
       session.close();
    }
 
-   protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type,  int node)
+   protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
    {
       setUpGroupHandler(type, node, 5000);
    }
 
-   protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type,  int node, int timeout)
+   protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
    {
-      GroupingHandler groupingHandler;
-      if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
-      {
-         groupingHandler = new LocalGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
-      }
-      else
-      {
-         groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), timeout);
-      }
-      this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+      this.servers[node].getConfiguration().setGroupingHandlerConfiguration(
+            new GroupingHandlerConfiguration(new SimpleString("grouparbitrator"), type, new SimpleString("queues"), timeout));
    }
 
-   protected void setUpGroupHandler(GroupingHandler groupingHandler,  int node)
+   protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
    {
-      this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+      this.servers[node].setGroupingHandler(groupingHandler);
    }
 
    protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
@@ -490,25 +478,25 @@
    }
 
    protected void verifyReceiveAllWithGroupIDRoundRobin(
-                                                int msgStart,
-                                                int msgEnd,
-                                                int... consumerIDs) throws Exception
+         int msgStart,
+         int msgEnd,
+         int... consumerIDs) throws Exception
    {
-      verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);   
+      verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
    }
 
    protected int verifyReceiveAllOnSingleConsumer(int msgStart,
-                                                int msgEnd,
-                                                int... consumerIDs) throws Exception
+                                                  int msgEnd,
+                                                  int... consumerIDs) throws Exception
    {
-      return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);  
+      return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
    }
 
    protected void verifyReceiveAllWithGroupIDRoundRobin(boolean ack,
-                                                long firstReceiveTime,
-                                                int msgStart,
-                                                int msgEnd,
-                                                int... consumerIDs) throws Exception
+                                                        long firstReceiveTime,
+                                                        int msgStart,
+                                                        int msgEnd,
+                                                        int... consumerIDs) throws Exception
    {
       HashMap<SimpleString, Integer> groupIdsReceived = new HashMap<SimpleString, Integer>();
       for (int i = 0; i < consumerIDs.length; i++)
@@ -544,8 +532,8 @@
             }
 
             SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
-               System.out.println("received " + id + " on consumer " + i);
-            if(groupIdsReceived.get(id) == null)
+            System.out.println("received " + id + " on consumer " + i);
+            if (groupIdsReceived.get(id) == null)
             {
                groupIdsReceived.put(id, i);
             }
@@ -562,9 +550,9 @@
    }
 
    protected int verifyReceiveAllOnSingleConsumer(boolean ack,
-                                                int msgStart,
-                                                int msgEnd,
-                                                int... consumerIDs) throws Exception
+                                                  int msgStart,
+                                                  int msgEnd,
+                                                  int... consumerIDs) throws Exception
    {
       int groupIdsReceived = -1;
       for (int i = 0; i < consumerIDs.length; i++)
@@ -628,7 +616,7 @@
 
                assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
             }
-           
+
             if (ack)
             {
                message.acknowledge();
@@ -639,7 +627,7 @@
                assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
             }
 
-            if (j != (Integer)(message.getProperty(COUNT_PROP)))
+            if (j != (Integer) (message.getProperty(COUNT_PROP)))
             {
                outOfOrder = true;
                System.out.println("Message j=" + j + " was received out of order = " + message.getProperty(COUNT_PROP));
@@ -697,8 +685,8 @@
             if (message != null)
             {
                log.info("check receive Consumer " + consumerIDs[i] +
-                        " received message " +
-                        message.getProperty(COUNT_PROP));
+                     " received message " +
+                     message.getProperty(COUNT_PROP));
             }
             else
             {
@@ -769,7 +757,7 @@
 
             if (message != null)
             {
-               int count = (Integer)message.getProperty(COUNT_PROP);
+               int count = (Integer) message.getProperty(COUNT_PROP);
 
                Integer prevCount = countMap.get(i);
 
@@ -793,7 +781,7 @@
             }
             else
             {
-              // log.info("consumer " + consumerIDs[i] +" returns null");
+               // log.info("consumer " + consumerIDs[i] +" returns null");
             }
          }
          while (message != null);
@@ -831,7 +819,7 @@
 
             if (message != null)
             {
-               int count = (Integer)message.getProperty(COUNT_PROP);
+               int count = (Integer) message.getProperty(COUNT_PROP);
 
                // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
@@ -879,7 +867,7 @@
 
          assertNotNull(list);
 
-         int elem = (Integer)list.poll();
+         int elem = (Integer) list.poll();
 
          assertEquals(messageCounts[i], elem);
 
@@ -919,7 +907,7 @@
 
          if (message != null)
          {
-            int count = (Integer)message.getProperty(COUNT_PROP);
+            int count = (Integer) message.getProperty(COUNT_PROP);
 
             ints.add(count);
          }
@@ -1015,7 +1003,7 @@
       }
 
       ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc);
-      
+
       sf.setFailoverOnServerShutdown(false);
       sf.setRetryInterval(100);
       sf.setRetryIntervalMultiplier(1d);
@@ -1127,6 +1115,78 @@
       servers[node] = server;
    }
 
+   protected void setupSharedStorageServer(int node, boolean fileStorage, boolean netty, int backupNode)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
+
+      Configuration configuration = new ConfigurationImpl();
+
+      configuration.setSecurityEnabled(false);
+      configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalMaxAIO(1000);
+      configuration.setJournalDirectory(getJournalDir(backupNode, false));
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(JournalType.ASYNCIO);
+      configuration.setJournalMaxAIO(1000);
+      configuration.setPagingDirectory(getPageDir(backupNode, false));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
+      configuration.setClustered(true);
+      configuration.setJournalCompactMinFiles(0);
+      configuration.setBackup(true);
+      configuration.setSharedStore(true);
+
+      if (backupNode != -1)
+      {
+         Map<String, Object> backupParams = generateParams(backupNode, netty);
+
+         if (netty)
+         {
+            TransportConfiguration nettyBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+
+            configuration.getConnectorConfigurations().put(nettyBackuptc.getName(), nettyBackuptc);
+
+            configuration.setBackupConnectorName(nettyBackuptc.getName());
+         }
+         else
+         {
+            TransportConfiguration invmBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
+
+            configuration.getConnectorConfigurations().put(invmBackuptc.getName(), invmBackuptc);
+
+            configuration.setBackupConnectorName(invmBackuptc.getName());
+         }
+      }
+
+      configuration.getAcceptorConfigurations().clear();
+
+      Map<String, Object> params = generateParams(node, netty);
+
+      TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+      configuration.getAcceptorConfigurations().add(invmtc);
+
+      if (netty)
+      {
+         TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+         configuration.getAcceptorConfigurations().add(nettytc);
+      }
+
+      HornetQServer server;
+
+      if (fileStorage)
+      {
+         server = HornetQ.newHornetQServer(configuration);
+      }
+      else
+      {
+         server = HornetQ.newHornetQServer(configuration, false);
+      }
+      servers[node] = server;
+   }
+
    protected void setupServerWithDiscovery(int node,
                                            String groupAddress,
                                            int port,
@@ -1223,21 +1283,21 @@
          configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
 
          connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
-                                                     nettyBackuptc == null ? null : nettyBackuptc.getName()));
+               nettyBackuptc == null ? null : nettyBackuptc.getName()));
       }
       else
       {
          connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
-                                                                                             : invmBackuptc.getName()));
+               : invmBackuptc.getName()));
       }
 
       BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                             null,
-                                                                             -1,
-                                                                             groupAddress,
-                                                                             port,
-                                                                             250,
-                                                                             connectorPairs);
+            null,
+            -1,
+            groupAddress,
+            port,
+            250,
+            connectorPairs);
 
       configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
@@ -1266,7 +1326,7 @@
       if (netty)
       {
          params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
-                    org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+               org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
       }
 
       return params;
@@ -1333,12 +1393,12 @@
       pairs.add(connectorPair);
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-                                                                                      address,
-                                                                                      100,
-                                                                                      true,
-                                                                                      forwardWhenNoConsumers,
-                                                                                      maxHops,
-                                                                                      pairs);
+            address,
+            100,
+            true,
+            forwardWhenNoConsumers,
+            maxHops,
+            pairs);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
@@ -1384,12 +1444,12 @@
       }
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-                                                                                      address,
-                                                                                      250,
-                                                                                      true,
-                                                                                      forwardWhenNoConsumers,
-                                                                                      maxHops,
-                                                                                      pairs);
+            address,
+            250,
+            true,
+            forwardWhenNoConsumers,
+            maxHops,
+            pairs);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1454,12 +1514,12 @@
       }
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-                                                                                      address,
-                                                                                      250,
-                                                                                      true,
-                                                                                      forwardWhenNoConsumers,
-                                                                                      maxHops,
-                                                                                      pairs);
+            address,
+            250,
+            true,
+            forwardWhenNoConsumers,
+            maxHops,
+            pairs);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1480,12 +1540,12 @@
       }
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-                                                                                      address,
-                                                                                      100,
-                                                                                      true,
-                                                                                      forwardWhenNoConsumers,
-                                                                                      maxHops,
-                                                                                      discoveryGroupName);
+            address,
+            100,
+            true,
+            forwardWhenNoConsumers,
+            maxHops,
+            discoveryGroupName);
       List<ClusterConnectionConfiguration> clusterConfs = server.getConfiguration().getClusterConfigurations();
 
       clusterConfs.add(clusterConf);

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -16,6 +16,7 @@
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.server.group.impl.Response;
 import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.management.Notification;
@@ -42,13 +43,14 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -98,6 +100,10 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
+      
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
+
       startServers(0, 1, 2);
 
       try
@@ -106,7 +112,7 @@
          {
             public SimpleString getName()
             {
-               return null;
+                  return null;
             }
 
             public Response propose(Proposal proposal) throws Exception
@@ -116,12 +122,12 @@
 
             public void proposed(Response response) throws Exception
             {
-
+               System.out.println("ClusteredGroupingTest.proposed");
             }
 
             public void send(Response response, int distance) throws Exception
             {
-
+               System.out.println("ClusteredGroupingTest.send");
             }
 
             public Response receive(Proposal proposal, int distance) throws Exception
@@ -131,11 +137,14 @@
 
             public void onNotification(Notification notification)
             {
+               System.out.println("ClusteredGroupingTest.onNotification");
+            }
 
+            public void addGroupBinding(GroupBinding groupBinding)
+            {
+               System.out.println("ClusteredGroupingTest.addGroupBinding");
             }
          }, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -192,14 +201,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -251,14 +261,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -313,14 +324,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -373,14 +385,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -436,14 +449,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -496,14 +510,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
@@ -547,11 +562,11 @@
          waitForBindings(2, "queues.testaddress", 1, 1, false);
 
          sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
+         verifyReceiveAllInRange(30, 40, 3);
          sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
+         verifyReceiveAllInRange(40, 50, 3);
          sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-         verifyReceiveAllInRange(30, 50, 3);
+         verifyReceiveAllInRange(50, 60, 3);
          System.out.println("*****************************************************************************");
       }
       finally
@@ -576,13 +591,14 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -642,13 +658,14 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -677,8 +694,6 @@
          sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
 
-         sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
          stopServers(1);
 
          startServers(1);
@@ -688,7 +703,7 @@
          waitForBindings(1, "queues.testaddress", 1, 1, true);
 
 
-         verifyReceiveAllInRange(10, 30, 1);
+         verifyReceiveAllInRange(10, 20, 1);
 
 
          System.out.println("*****************************************************************************");
@@ -716,13 +731,14 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -788,13 +804,15 @@
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
+
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -817,18 +835,19 @@
          sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
          verifyReceiveAllInRange(0, 10, 0);
-
+         closeSessionFactory(0);
          stopServers(0);
 
          sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
 
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
          startServers(0);
-
          waitForBindings(0, "queues.testaddress", 1, 0, true);
+         setupSessionFactory(0, isNetty());
+         verifyReceiveAllInRange(10, 20, 0);
          sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-         verifyReceiveAllInRange(10, 20, 0);
          verifyReceiveAllInRange(20, 30, 0);
 
          System.out.println("*****************************************************************************");
@@ -854,14 +873,14 @@
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -919,14 +938,14 @@
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
 
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
       startServers(0, 1, 2);
 
       try
       {
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -951,9 +970,9 @@
          CountDownLatch latch = new CountDownLatch(1);
          Thread[] threads = new Thread[9];
          int range = 0;
-         for(int i = 0 ; i < 9; i++,range+=10)
+         for (int i = 0; i < 9; i++, range += 10)
          {
-            threads[i] = new Thread(new ThreadSender(range, range+10, 1, new SimpleString("id" + i), latch, i < 8));
+            threads[i] = new Thread(new ThreadSender(range, range + 10, 1, new SimpleString("id" + i), latch, i < 8));
          }
          for (Thread thread : threads)
          {
@@ -975,6 +994,61 @@
    }
 
 
+   public void testGroupingLocalHandlerFails() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupSharedStorageServer(2, isFileStorage(), isNetty(), 0);
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+
+      setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
+
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, true);
+         createQueue(1, "queues.testaddress", "queue0", null, true);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, false);
+         waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+         sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAll(10, 0);
+         closeSessionFactory(0);
+         stopServers(0);
+         Thread.sleep(5000);
+         //setupSessionFactory(3, isNetty());
+         sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
    public boolean isNetty()
    {
       return true;

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -23,6 +23,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.Queue;
@@ -70,7 +71,7 @@
 
          List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
 
-         journal.loadBindingJournal(queueBindingInfos);
+         journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
 
          Map<Long, Queue> queues = new HashMap<Long, Queue>();
 
@@ -88,7 +89,7 @@
 
          queueBindingInfos = new ArrayList<QueueBindingInfo>();
 
-         journal.loadBindingJournal(queueBindingInfos);
+         journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
 
          journal.start();
       }

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -43,6 +43,7 @@
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -50,6 +51,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -842,7 +844,15 @@
       {
       }
 
-      /* (non-Javadoc)
+      public void addGrouping(GroupBinding groupBinding) throws Exception
+      {
+         //To change body of implemented methods use File | Settings | File Templates.
+      }
+
+      public void deleteGrouping(GroupBinding groupBinding) throws Exception
+      {
+         //To change body of implemented methods use File | Settings | File Templates.
+      }/* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#addQueueBinding(org.hornetq.core.postoffice.Binding)
        */
       public void addQueueBinding(final Binding binding) throws Exception
@@ -933,13 +943,14 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
        */
-      public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+      public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
       {
+         
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
-       */
+      * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
+      */
       public void loadMessageJournal(PagingManager pagingManager,
                                      ResourceManager resourceManager,
                                      Map<Long, Queue> queues,

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -26,6 +26,7 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
@@ -86,7 +87,7 @@
          journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
 
          journal.start();
-         journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+         journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 
          HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
@@ -108,7 +109,7 @@
 
          journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
          journal.start();
-         journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+         journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 
          journal.loadMessageJournal(new FakePagingManager(),
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
@@ -135,7 +136,7 @@
 
          journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
          journal.start();
-         journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+         journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 
          journal.loadMessageJournal(new FakePagingManager(),
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-10-19 11:30:49 UTC (rev 8126)
@@ -150,13 +150,4 @@
    {
 
    }
-
-   public void setGroupingHandler(GroupingHandler groupingHandler)
-   {
-   }
-
-   public GroupingHandler getGroupingHandler()
-   {
-      return null;
-   }
 }
\ No newline at end of file



More information about the hornetq-commits mailing list