[hornetq-commits] JBoss hornetq SVN: r8108 - in branches/hornetq_grouping: src/main/org/hornetq/core/server/group and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 14 08:23:12 EDT 2009


Author: ataylor
Date: 2009-10-14 08:23:12 -0400 (Wed, 14 Oct 2009)
New Revision: 8108

Modified:
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
a few tweaks

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -698,6 +698,7 @@
    public void setGroupingHandler(GroupingHandler groupingHandler)
    {
       groupingGroupingHandler = groupingHandler;
+      managementService.addNotificationListener(groupingGroupingHandler);
    }
 
    public GroupingHandler getGroupingHandler()

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -15,11 +15,13 @@
 import org.hornetq.utils.SimpleString;
 import org.hornetq.core.server.group.impl.Proposal;
 import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.management.NotificationListener;
+import org.hornetq.core.management.Notification;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
-public interface GroupingHandler
+public interface GroupingHandler extends NotificationListener
 {
    SimpleString getName();
 
@@ -30,4 +32,6 @@
    void send(Response response, int distance) throws Exception;
 
    Response receive(Proposal proposal, int distance) throws Exception;
+
+   void onNotification(Notification notification);
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -26,6 +26,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -34,8 +35,10 @@
 {
    private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
 
-   private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
+   private ConcurrentHashMap<SimpleString, SimpleString> map = new ConcurrentHashMap<SimpleString, SimpleString>();
 
+   private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+
    private final SimpleString name;
 
    private final ManagementService managementService;
@@ -59,12 +62,13 @@
    {
       if(proposal.getProposal() == null)
       {
-         Object original = map.get(proposal.getProposalType());
+         SimpleString original = map.get(proposal.getProposalType());
          return original == null?null:new Response(proposal.getProposalType(), original);
       }
       Response response = new Response(proposal.getProposalType(), proposal.getProposal());
       if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
       {
+         groupMap.put(response.getChosen(), response.getResponseType());
          return response;
       }
       else
@@ -81,8 +85,8 @@
    {
       TypedProperties props = new TypedProperties();
       props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getOriginal());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternative());
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -94,5 +98,19 @@
    {
       return propose(proposal);
    }
+
+   public void onNotification(Notification notification)
+   {
+      if(notification.getType() == NotificationType.BINDING_REMOVED)
+      {
+         SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+         SimpleString val = groupMap.get(clusterName);
+         if(val != null)
+         {
+            groupMap.remove(clusterName);
+            map.remove(val);
+         }
+      }
+   }
 }
 

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -20,12 +20,12 @@
 public class Proposal
 {
    private final SimpleString proposalType;
-   private final Object proposal;
+   private final SimpleString proposal;
 
    public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
    public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
 
-   public Proposal(SimpleString proposalType, Object proposal)
+   public Proposal(SimpleString proposalType, SimpleString proposal)
    {
       this.proposal = proposal;
       this.proposalType = proposalType;
@@ -36,7 +36,7 @@
       return proposalType;
    }
 
-   public Object getProposal()
+   public SimpleString getProposal()
    {
       return proposal;
    }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -50,6 +50,8 @@
 
    private int waitTime = 1000;
 
+   private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+
    public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
    {
       this.name = name;
@@ -78,7 +80,7 @@
          lock.lock();
          TypedProperties props = new TypedProperties();
          props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
-         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
          props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
          props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
@@ -104,6 +106,7 @@
       {
          lock.lock();
          responses.put(response.getResponseType(), response);
+         groupMap.put(response.getChosen(), response.getResponseType());
          sendCondition.signal();
       }
       finally
@@ -116,7 +119,7 @@
    {
       TypedProperties props = new TypedProperties();
       props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -129,5 +132,18 @@
    {
    }
 
+   public void onNotification(Notification notification)
+   {
+      if(notification.getType() == NotificationType.BINDING_REMOVED)
+      {
+         SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+         SimpleString val = groupMap.get(clusterName);
+         if(val != null)
+         {
+            groupMap.remove(clusterName);
+            responses.remove(val);
+         }
+      }
+   }
 }
 

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -21,18 +21,18 @@
 {
    private final boolean accepted;
 
-   private final Object original;
+   private final SimpleString original;
 
-   private final Object alternative;
+   private final SimpleString alternative;
 
    private SimpleString responseType;
 
-   public Response(SimpleString responseType, Object original)
+   public Response(SimpleString responseType, SimpleString original)
    {
       this(responseType, original, null);
    }
 
-   public Response(SimpleString responseType, Object original, Object alternative)
+   public Response(SimpleString responseType, SimpleString original, SimpleString alternative)
    {
       this.responseType = responseType;
       this.accepted = alternative == null;
@@ -45,17 +45,17 @@
       return accepted;
    }
 
-   public Object getOriginal()
+   public SimpleString getOriginal()
    {
       return original;
    }
 
-   public Object getAlternative()
+   public SimpleString getAlternative()
    {
       return alternative;
    }
 
-   public Object getChosen()
+   public SimpleString getChosen()
    {
       return alternative != null?alternative:original;
    }

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 12:23:12 UTC (rev 8108)
@@ -434,34 +434,24 @@
 
          verifyReceiveAllInRange(20, 30, 0);
          removeConsumer(0);
+         removeConsumer(1);
+         removeConsumer(2);
          deleteQueue(0, "queue0");
-         try
-         {
-            sendInRange(0, "queues.testaddress", 30, 31, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-            fail("should throw exception");
-         }
-         catch (HornetQException e)
-         {
-            assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIST);
-         }
-         try
-         {
-            sendInRange(1, "queues.testaddress", 31, 32, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-            fail("should throw exception");
-         }
-         catch (HornetQException e)
-         {
-            assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIST);
-         }
-         try
-         {
-            sendInRange(2, "queues.testaddress", 32, 33, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-            fail("should throw exception");
-         }
-         catch (HornetQException e)
-         {
-            assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIST);
-         }
+         deleteQueue(1, "queue0");
+         deleteQueue(2, "queue0");
+         createQueue(0, "queues.testaddress", "queue1", null, false);
+         addConsumer(3, 0, "queue1", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, false);
+         waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+         sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         verifyReceiveAllInRange(30, 50, 3);
          System.out.println("*****************************************************************************");
       }
       finally



More information about the hornetq-commits mailing list