[hornetq-commits] JBoss hornetq SVN: r8109 - in branches/hornetq_grouping: src/main/org/hornetq/core/server/group/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 14 08:59:52 EDT 2009
Author: ataylor
Date: 2009-10-14 08:59:50 -0400 (Wed, 14 Oct 2009)
New Revision: 8109
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
timeout test
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-10-14 12:59:50 UTC (rev 8109)
@@ -495,7 +495,7 @@
}
else
{
- groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress());
+ groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
}
log.info("deploying grouping handler: " + groupingHandler);
postOffice.setGroupingHandler(groupingHandler);
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-14 12:59:50 UTC (rev 8109)
@@ -48,15 +48,16 @@
private final Condition sendCondition = lock.newCondition();
- private int waitTime = 1000;
+ private final int timeout;
private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
- public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+ public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, int timeout)
{
this.name = name;
this.address = address;
this.managementService = managementService;
+ this.timeout = timeout;
}
public SimpleString getName()
@@ -86,7 +87,7 @@
props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
managementService.sendNotification(notification);
- sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+ sendCondition.await(timeout, TimeUnit.MILLISECONDS);
response = responses.get(proposal.getProposalType());
}
finally
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-14 12:59:50 UTC (rev 8109)
@@ -29,7 +29,6 @@
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
@@ -452,9 +451,13 @@
session.close();
}
-
protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
{
+ setUpGroupHandler(type, node, 5000);
+ }
+
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
+ {
GroupingHandler groupingHandler;
if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
{
@@ -462,11 +465,16 @@
}
else
{
- groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
+ groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), timeout);
}
this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
}
+ protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
+ {
+ this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+ }
+
protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
{
sendInRange(node, address, 0, numMessages, durable, filterVal);
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14 12:59:50 UTC (rev 8109)
@@ -14,7 +14,11 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.management.Notification;
import org.hornetq.utils.SimpleString;
/**
@@ -79,6 +83,100 @@
}
}
+ public void testGroupingTimeout() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupHandler(new GroupingHandler()
+ {
+ public SimpleString getName()
+ {
+ return null;
+ }
+
+ public Response propose(Proposal proposal) throws Exception
+ {
+ return null;
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ return null;
+ }
+
+ public void onNotification(Notification notification)
+ {
+
+ }
+ }, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+ try
+ {
+ sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ fail("should timeout");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
public void testGroupingSendTo2queues() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
More information about the hornetq-commits
mailing list