From do-not-reply at jboss.org Wed Oct 14 08:23:13 2009 Content-Type: multipart/mixed; boundary="===============5057126316595932139==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r8108 - in branches/hornetq_grouping: src/main/org/hornetq/core/server/group and 2 other directories. Date: Wed, 14 Oct 2009 08:23:12 -0400 Message-ID: <200910141223.n9ECNCPL001527@svn01.web.mwc.hst.phx2.redhat.com> --===============5057126316595932139== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: ataylor Date: 2009-10-14 08:23:12 -0400 (Wed, 14 Oct 2009) New Revision: 8108 Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/Post= OfficeImpl.java branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Groupin= gHandler.java branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Lo= calGroupingHandler.java branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Pr= oposal.java branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Re= moteGroupingHandler.java branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Re= sponse.java branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluste= r/distribution/ClusteredGroupingTest.java Log: a few tweaks Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/im= pl/PostOfficeImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/Pos= tOfficeImpl.java 2009-10-14 10:45:21 UTC (rev 8107) +++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/Pos= tOfficeImpl.java 2009-10-14 12:23:12 UTC (rev 8108) @@ -698,6 +698,7 @@ public void setGroupingHandler(GroupingHandler groupingHandler) { groupingGroupingHandler =3D groupingHandler; + managementService.addNotificationListener(groupingGroupingHandler); } = public GroupingHandler getGroupingHandler() Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/= GroupingHandler.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Groupi= ngHandler.java 2009-10-14 10:45:21 UTC (rev 8107) +++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Groupi= ngHandler.java 2009-10-14 12:23:12 UTC (rev 8108) @@ -15,11 +15,13 @@ import org.hornetq.utils.SimpleString; import org.hornetq.core.server.group.impl.Proposal; import org.hornetq.core.server.group.impl.Response; +import org.hornetq.core.management.NotificationListener; +import org.hornetq.core.management.Notification; = /** * @author Andy Taylor */ -public interface GroupingHandler +public interface GroupingHandler extends NotificationListener { SimpleString getName(); = @@ -30,4 +32,6 @@ void send(Response response, int distance) throws Exception; = Response receive(Proposal proposal, int distance) throws Exception; + + void onNotification(Notification notification); } Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/= impl/LocalGroupingHandler.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/L= ocalGroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107) +++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/L= ocalGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108) @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.HashMap; = /** * @author Andy Taylor @@ -34,8 +35,10 @@ { private static Logger log =3D Logger.getLogger(LocalGroupingHandler.cla= ss); = - private ConcurrentHashMap map =3D new ConcurrentH= ashMap(); + private ConcurrentHashMap map =3D new Concu= rrentHashMap(); = + private HashMap groupMap =3D new HashMap(); + private final SimpleString name; = private final ManagementService managementService; @@ -59,12 +62,13 @@ { if(proposal.getProposal() =3D=3D null) { - Object original =3D map.get(proposal.getProposalType()); + SimpleString original =3D map.get(proposal.getProposalType()); return original =3D=3D null?null:new Response(proposal.getProposa= lType(), original); } Response response =3D new Response(proposal.getProposalType(), propo= sal.getProposal()); if (map.putIfAbsent(response.getResponseType(), response.getChosen()= ) =3D=3D null) { + groupMap.put(response.getChosen(), response.getResponseType()); return response; } else @@ -81,8 +85,8 @@ { TypedProperties props =3D new TypedProperties(); props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response= .getResponseType()); - props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (Simple= String)response.getOriginal()); - props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (Si= mpleString)response.getAlternative()); + props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, respons= e.getOriginal()); + props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, res= ponse.getAlternative()); props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.= LOCAL_QUEUE_INDEX); props.putStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance); @@ -94,5 +98,19 @@ { return propose(proposal); } + + public void onNotification(Notification notification) + { + if(notification.getType() =3D=3D NotificationType.BINDING_REMOVED) + { + SimpleString clusterName =3D (SimpleString) notification.getPrope= rties().getProperty(ManagementHelper.HDR_CLUSTER_NAME); + SimpleString val =3D groupMap.get(clusterName); + if(val !=3D null) + { + groupMap.remove(clusterName); + map.remove(val); + } + } + } } = Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/= impl/Proposal.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/P= roposal.java 2009-10-14 10:45:21 UTC (rev 8107) +++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/P= roposal.java 2009-10-14 12:23:12 UTC (rev 8108) @@ -20,12 +20,12 @@ public class Proposal { private final SimpleString proposalType; - private final Object proposal; + private final SimpleString proposal; = public static final String PROPOSAL_TYPE_HEADER =3D "_JBM_PROPOSAL_TYPE= "; public static final String PROPOSAL_HEADER =3D "_JBM_PROPOSAL"; = - public Proposal(SimpleString proposalType, Object proposal) + public Proposal(SimpleString proposalType, SimpleString proposal) { this.proposal =3D proposal; this.proposalType =3D proposalType; @@ -36,7 +36,7 @@ return proposalType; } = - public Object getProposal() + public SimpleString getProposal() { return proposal; } Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/= impl/RemoteGroupingHandler.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R= emoteGroupingHandler.java 2009-10-14 10:45:21 UTC (rev 8107) +++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R= emoteGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108) @@ -50,6 +50,8 @@ = private int waitTime =3D 1000; = + private HashMap groupMap =3D new HashMap(); + public RemoteGroupingHandler(final ManagementService managementService,= final SimpleString name, final SimpleString address) { this.name =3D name; @@ -78,7 +80,7 @@ lock.lock(); TypedProperties props =3D new TypedProperties(); props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, propo= sal.getProposalType()); - props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (Sim= pleString)proposal.getProposal()); + props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, prop= osal.getProposal()); props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingTy= pe.LOCAL_QUEUE_INDEX); props.putStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0); @@ -104,6 +106,7 @@ { lock.lock(); responses.put(response.getResponseType(), response); + groupMap.put(response.getChosen(), response.getResponseType()); sendCondition.signal(); } finally @@ -116,7 +119,7 @@ { TypedProperties props =3D new TypedProperties(); props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal= .getProposalType()); - props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (Simple= String)proposal.getProposal()); + props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposa= l.getProposal()); props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.= LOCAL_QUEUE_INDEX); props.putStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance); @@ -129,5 +132,18 @@ { } = + public void onNotification(Notification notification) + { + if(notification.getType() =3D=3D NotificationType.BINDING_REMOVED) + { + SimpleString clusterName =3D (SimpleString) notification.getPrope= rties().getProperty(ManagementHelper.HDR_CLUSTER_NAME); + SimpleString val =3D groupMap.get(clusterName); + if(val !=3D null) + { + groupMap.remove(clusterName); + responses.remove(val); + } + } + } } = Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/= impl/Response.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R= esponse.java 2009-10-14 10:45:21 UTC (rev 8107) +++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/R= esponse.java 2009-10-14 12:23:12 UTC (rev 8108) @@ -21,18 +21,18 @@ { private final boolean accepted; = - private final Object original; + private final SimpleString original; = - private final Object alternative; + private final SimpleString alternative; = private SimpleString responseType; = - public Response(SimpleString responseType, Object original) + public Response(SimpleString responseType, SimpleString original) { this(responseType, original, null); } = - public Response(SimpleString responseType, Object original, Object alte= rnative) + public Response(SimpleString responseType, SimpleString original, Simpl= eString alternative) { this.responseType =3D responseType; this.accepted =3D alternative =3D=3D null; @@ -45,17 +45,17 @@ return accepted; } = - public Object getOriginal() + public SimpleString getOriginal() { return original; } = - public Object getAlternative() + public SimpleString getAlternative() { return alternative; } = - public Object getChosen() + public SimpleString getChosen() { return alternative !=3D null?alternative:original; } Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration= /cluster/distribution/ClusteredGroupingTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust= er/distribution/ClusteredGroupingTest.java 2009-10-14 10:45:21 UTC (rev 810= 7) +++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust= er/distribution/ClusteredGroupingTest.java 2009-10-14 12:23:12 UTC (rev 810= 8) @@ -434,34 +434,24 @@ = verifyReceiveAllInRange(20, 30, 0); removeConsumer(0); + removeConsumer(1); + removeConsumer(2); deleteQueue(0, "queue0"); - try - { - sendInRange(0, "queues.testaddress", 30, 31, false, MessageImp= l.HDR_GROUP_ID, new SimpleString("id1")); - fail("should throw exception"); - } - catch (HornetQException e) - { - assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIS= T); - } - try - { - sendInRange(1, "queues.testaddress", 31, 32, false, MessageImp= l.HDR_GROUP_ID, new SimpleString("id1")); - fail("should throw exception"); - } - catch (HornetQException e) - { - assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIS= T); - } - try - { - sendInRange(2, "queues.testaddress", 32, 33, false, MessageImp= l.HDR_GROUP_ID, new SimpleString("id1")); - fail("should throw exception"); - } - catch (HornetQException e) - { - assertEquals(e.getCode(), HornetQException.QUEUE_DOES_NOT_EXIS= T); - } + deleteQueue(1, "queue0"); + deleteQueue(2, "queue0"); + createQueue(0, "queues.testaddress", "queue1", null, false); + addConsumer(3, 0, "queue1", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, false); + waitForBindings(2, "queues.testaddress", 1, 1, false); + + sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.H= DR_GROUP_ID, new SimpleString("id1")); + + sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.H= DR_GROUP_ID, new SimpleString("id1")); + + sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.H= DR_GROUP_ID, new SimpleString("id1")); + verifyReceiveAllInRange(30, 50, 3); System.out.println("*********************************************= ********************************"); } finally --===============5057126316595932139==--