[hornetq-commits] JBoss hornetq SVN: r8108 - in branches/hornetq_grouping: src/main/org/hornetq/core/server/group and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 14 08:23:12 EDT 2009
Author: ataylor
Date: 2009-10-14 08:23:12 -0400 (Wed, 14 Oct 2009)
New Revision: 8108
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
a few tweaks
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -698,6 +698,7 @@
public void setGroupingHandler(GroupingHandler groupingHandler)
{
groupingGroupingHandler = groupingHandler;
+ managementService.addNotificationListener(groupingGroupingHandler);
}
public GroupingHandler getGroupingHandler()
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -15,11 +15,13 @@
import org.hornetq.utils.SimpleString;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.management.NotificationListener;
+import org.hornetq.core.management.Notification;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
-public interface GroupingHandler
+public interface GroupingHandler extends NotificationListener
{
SimpleString getName();
@@ -30,4 +32,6 @@
void send(Response response, int distance) throws Exception;
Response receive(Proposal proposal, int distance) throws Exception;
+
+ void onNotification(Notification notification);
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -34,8 +35,10 @@
{
private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
- private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
+ private ConcurrentHashMap<SimpleString, SimpleString> map = new ConcurrentHashMap<SimpleString, SimpleString>();
+ private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+
private final SimpleString name;
private final ManagementService managementService;
@@ -59,12 +62,13 @@
{
if(proposal.getProposal() == null)
{
- Object original = map.get(proposal.getProposalType());
+ SimpleString original = map.get(proposal.getProposalType());
return original == null?null:new Response(proposal.getProposalType(), original);
}
Response response = new Response(proposal.getProposalType(), proposal.getProposal());
if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
{
+ groupMap.put(response.getChosen(), response.getResponseType());
return response;
}
else
@@ -81,8 +85,8 @@
{
TypedProperties props = new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getOriginal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternative());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -94,5 +98,19 @@
{
return propose(proposal);
}
+
+ public void onNotification(Notification notification)
+ {
+ if(notification.getType() == NotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ SimpleString val = groupMap.get(clusterName);
+ if(val != null)
+ {
+ groupMap.remove(clusterName);
+ map.remove(val);
+ }
+ }
+ }
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -20,12 +20,12 @@
public class Proposal
{
private final SimpleString proposalType;
- private final Object proposal;
+ private final SimpleString proposal;
public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
- public Proposal(SimpleString proposalType, Object proposal)
+ public Proposal(SimpleString proposalType, SimpleString proposal)
{
this.proposal = proposal;
this.proposalType = proposalType;
@@ -36,7 +36,7 @@
return proposalType;
}
- public Object getProposal()
+ public SimpleString getProposal()
{
return proposal;
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -50,6 +50,8 @@
private int waitTime = 1000;
+ private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+
public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
{
this.name = name;
@@ -78,7 +80,7 @@
lock.lock();
TypedProperties props = new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
@@ -104,6 +106,7 @@
{
lock.lock();
responses.put(response.getResponseType(), response);
+ groupMap.put(response.getChosen(), response.getResponseType());
sendCondition.signal();
}
finally
@@ -116,7 +119,7 @@
{
TypedProperties props = new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -129,5 +132,18 @@
{
}
+ public void onNotification(Notification notification)
+ {
+ if(notification.getType() == NotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ SimpleString val = groupMap.get(clusterName);
+ if(val != null)
+ {
+ groupMap.remove(clusterName);
+ responses.remove(val);
+ }
+ }
+ }
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -21,18 +21,18 @@
{
private final boolean accepted;
- private final Object original;
+ private final SimpleString original;
- private final Object alternative;
+ private final SimpleString alternative;
private SimpleString responseType;
- public Response(SimpleString responseType, Object original)
+ public Response(SimpleString responseType, SimpleString original)
{
this(responseType, original, null);
}
- public Response(SimpleString responseType, Object original, Object alternative)
+ public Response(SimpleString responseType, SimpleString original, SimpleString alternative)
{
this.responseType = responseType;
this.accepted = alternative == null;
@@ -45,17 +45,17 @@
return accepted;
}
- public Object getOriginal()
+ public SimpleString getOriginal()
{
return original;
}
- public Object getAlternative()
+ public SimpleString getAlternative()
{
return alternative;
}
- public Object getChosen()
+ public SimpleString getChosen()
{
return alternative != null?alternative:original;
}
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -434,34 +434,24 @@
verifyReceiveAllInRange(20, 30, 0);
removeConsumer(0);
+ removeConsumer(1);
+ removeConsumer(2);
deleteQueue(0, "queue0");
- try
- {
- sendInRange(0, "queues.testaddress", 30, 31, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- fail("should throw exception");
- }
- catch (HornetQException e)
- {
- assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIST);
- }
- try
- {
- sendInRange(1, "queues.testaddress", 31, 32, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- fail("should throw exception");
- }
- catch (HornetQException e)
- {
- assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIST);
- }
- try
- {
- sendInRange(2, "queues.testaddress", 32, 33, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- fail("should throw exception");
- }
- catch (HornetQException e)
- {
- assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIST);
- }
+ deleteQueue(1, "queue0");
+ deleteQueue(2, "queue0");
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ addConsumer(3, 0, "queue1", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+ sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ verifyReceiveAllInRange(30, 50, 3);
System.out.println("*****************************************************************************");
}
finally
More information about the hornetq-commits
mailing list