From do-not-reply at jboss.org Wed Oct 14 08:23:13 2009
Content-Type: multipart/mixed; boundary="===============5057126316595932139=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r8108 - in
branches/hornetq_grouping: src/main/org/hornetq/core/server/group and 2 other
directories.
Date: Wed, 14 Oct 2009 08:23:12 -0400
Message-ID: <200910141223.n9ECNCPL001527@svn01.web.mwc.hst.phx2.redhat.com>
--===============5057126316595932139==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: ataylor
Date: 2009-10-14 08:23:12 -0400 (Wed, 14 Oct 2009)
New Revision: 8108
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/Post=
OfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Groupin=
gHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Lo=
calGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Pr=
oposal.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Re=
moteGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Re=
sponse.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluste=
r/distribution/ClusteredGroupingTest.java
Log:
a few tweaks
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/im=
pl/PostOfficeImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/Pos=
tOfficeImpl.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/Pos=
tOfficeImpl.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -698,6 +698,7 @@
public void setGroupingHandler(GroupingHandler groupingHandler)
{
groupingGroupingHandler =3D groupingHandler;
+ managementService.addNotificationListener(groupingGroupingHandler);
}
=
public GroupingHandler getGroupingHandler()
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/=
GroupingHandler.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Groupi=
ngHandler.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Groupi=
ngHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -15,11 +15,13 @@
import org.hornetq.utils.SimpleString;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.management.NotificationListener;
+import org.hornetq.core.management.Notification;
=
/**
* @author Andy Taylor
*/
-public interface GroupingHandler
+public interface GroupingHandler extends NotificationListener
{
SimpleString getName();
=
@@ -30,4 +32,6 @@
void send(Response response, int distance) throws Exception;
=
Response receive(Proposal proposal, int distance) throws Exception;
+
+ void onNotification(Notification notification);
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/=
impl/LocalGroupingHandler.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/L=
ocalGroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/L=
ocalGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
=
/**
* @author Andy Taylor
@@ -34,8 +35,10 @@
{
private static Logger log =3D Logger.getLogger(LocalGroupingHandler.cla=
ss);
=
- private ConcurrentHashMap map =3D new ConcurrentH=
ashMap();
+ private ConcurrentHashMap map =3D new Concu=
rrentHashMap();
=
+ private HashMap groupMap =3D new HashMap();
+
private final SimpleString name;
=
private final ManagementService managementService;
@@ -59,12 +62,13 @@
{
if(proposal.getProposal() =3D=3D null)
{
- Object original =3D map.get(proposal.getProposalType());
+ SimpleString original =3D map.get(proposal.getProposalType());
return original =3D=3D null?null:new Response(proposal.getProposa=
lType(), original);
}
Response response =3D new Response(proposal.getProposalType(), propo=
sal.getProposal());
if (map.putIfAbsent(response.getResponseType(), response.getChosen()=
) =3D=3D null)
{
+ groupMap.put(response.getChosen(), response.getResponseType());
return response;
}
else
@@ -81,8 +85,8 @@
{
TypedProperties props =3D new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response=
.getResponseType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (Simple=
String)response.getOriginal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (Si=
mpleString)response.getAlternative());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, respons=
e.getOriginal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, res=
ponse.getAlternative());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.=
LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -94,5 +98,19 @@
{
return propose(proposal);
}
+
+ public void onNotification(Notification notification)
+ {
+ if(notification.getType() =3D=3D NotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName =3D (SimpleString) notification.getPrope=
rties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ SimpleString val =3D groupMap.get(clusterName);
+ if(val !=3D null)
+ {
+ groupMap.remove(clusterName);
+ map.remove(val);
+ }
+ }
+ }
}
=
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/=
impl/Proposal.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/P=
roposal.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/P=
roposal.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -20,12 +20,12 @@
public class Proposal
{
private final SimpleString proposalType;
- private final Object proposal;
+ private final SimpleString proposal;
=
public static final String PROPOSAL_TYPE_HEADER =3D "_JBM_PROPOSAL_TYPE=
";
public static final String PROPOSAL_HEADER =3D "_JBM_PROPOSAL";
=
- public Proposal(SimpleString proposalType, Object proposal)
+ public Proposal(SimpleString proposalType, SimpleString proposal)
{
this.proposal =3D proposal;
this.proposalType =3D proposalType;
@@ -36,7 +36,7 @@
return proposalType;
}
=
- public Object getProposal()
+ public SimpleString getProposal()
{
return proposal;
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/=
impl/RemoteGroupingHandler.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R=
emoteGroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R=
emoteGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -50,6 +50,8 @@
=
private int waitTime =3D 1000;
=
+ private HashMap groupMap =3D new HashMap();
+
public RemoteGroupingHandler(final ManagementService managementService,=
final SimpleString name, final SimpleString address)
{
this.name =3D name;
@@ -78,7 +80,7 @@
lock.lock();
TypedProperties props =3D new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, propo=
sal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (Sim=
pleString)proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, prop=
osal.getProposal());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingTy=
pe.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
@@ -104,6 +106,7 @@
{
lock.lock();
responses.put(response.getResponseType(), response);
+ groupMap.put(response.getChosen(), response.getResponseType());
sendCondition.signal();
}
finally
@@ -116,7 +119,7 @@
{
TypedProperties props =3D new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal=
.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (Simple=
String)proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposa=
l.getProposal());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.=
LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -129,5 +132,18 @@
{
}
=
+ public void onNotification(Notification notification)
+ {
+ if(notification.getType() =3D=3D NotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName =3D (SimpleString) notification.getPrope=
rties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ SimpleString val =3D groupMap.get(clusterName);
+ if(val !=3D null)
+ {
+ groupMap.remove(clusterName);
+ responses.remove(val);
+ }
+ }
+ }
}
=
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/=
impl/Response.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R=
esponse.java 2009-10-14 10:45:21 UTC (rev 8107)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R=
esponse.java 2009-10-14 12:23:12 UTC (rev 8108)
@@ -21,18 +21,18 @@
{
private final boolean accepted;
=
- private final Object original;
+ private final SimpleString original;
=
- private final Object alternative;
+ private final SimpleString alternative;
=
private SimpleString responseType;
=
- public Response(SimpleString responseType, Object original)
+ public Response(SimpleString responseType, SimpleString original)
{
this(responseType, original, null);
}
=
- public Response(SimpleString responseType, Object original, Object alte=
rnative)
+ public Response(SimpleString responseType, SimpleString original, Simpl=
eString alternative)
{
this.responseType =3D responseType;
this.accepted =3D alternative =3D=3D null;
@@ -45,17 +45,17 @@
return accepted;
}
=
- public Object getOriginal()
+ public SimpleString getOriginal()
{
return original;
}
=
- public Object getAlternative()
+ public SimpleString getAlternative()
{
return alternative;
}
=
- public Object getChosen()
+ public SimpleString getChosen()
{
return alternative !=3D null?alternative:original;
}
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration=
/cluster/distribution/ClusteredGroupingTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust=
er/distribution/ClusteredGroupingTest.java 2009-10-14 10:45:21 UTC (rev 810=
7)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust=
er/distribution/ClusteredGroupingTest.java 2009-10-14 12:23:12 UTC (rev 810=
8)
@@ -434,34 +434,24 @@
=
verifyReceiveAllInRange(20, 30, 0);
removeConsumer(0);
+ removeConsumer(1);
+ removeConsumer(2);
deleteQueue(0, "queue0");
- try
- {
- sendInRange(0, "queues.testaddress", 30, 31, false, MessageImp=
l.HDR_GROUP_ID, new SimpleString("id1"));
- fail("should throw exception");
- }
- catch (HornetQException e)
- {
- assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIS=
T);
- }
- try
- {
- sendInRange(1, "queues.testaddress", 31, 32, false, MessageImp=
l.HDR_GROUP_ID, new SimpleString("id1"));
- fail("should throw exception");
- }
- catch (HornetQException e)
- {
- assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIS=
T);
- }
- try
- {
- sendInRange(2, "queues.testaddress", 32, 33, false, MessageImp=
l.HDR_GROUP_ID, new SimpleString("id1"));
- fail("should throw exception");
- }
- catch (HornetQException e)
- {
- assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIS=
T);
- }
+ deleteQueue(1, "queue0");
+ deleteQueue(2, "queue0");
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ addConsumer(3, 0, "queue1", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+ sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.H=
DR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.H=
DR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.H=
DR_GROUP_ID, new SimpleString("id1"));
+ verifyReceiveAllInRange(30, 50, 3);
System.out.println("*********************************************=
********************************");
}
finally
--===============5057126316595932139==--