[hornetq-commits] JBoss hornetq SVN: r8109 - in branches/hornetq_grouping: src/main/org/hornetq/core/server/group/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 14 08:59:52 EDT 2009


Author: ataylor
Date: 2009-10-14 08:59:50 -0400 (Wed, 14 Oct 2009)
New Revision: 8109

Modified:
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
timeout test

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-10-14 12:59:50 UTC (rev 8109)
@@ -495,7 +495,7 @@
       }
       else
       {
-         groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress());
+         groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
       }
       log.info("deploying grouping handler: " + groupingHandler);
       postOffice.setGroupingHandler(groupingHandler);

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-10-14 12:59:50 UTC (rev 8109)
@@ -48,15 +48,16 @@
 
    private final Condition sendCondition = lock.newCondition();
 
-   private int waitTime = 1000;
+   private final int timeout;
 
    private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
 
-   public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+   public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, int timeout)
    {
       this.name = name;
       this.address = address;
       this.managementService = managementService;
+      this.timeout = timeout;
    }
 
    public SimpleString getName()
@@ -86,7 +87,7 @@
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
          Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
          managementService.sendNotification(notification);
-         sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+         sendCondition.await(timeout, TimeUnit.MILLISECONDS);
          response = responses.get(proposal.getProposalType());
       }
       finally

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-14 12:59:50 UTC (rev 8109)
@@ -29,7 +29,6 @@
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.FailoverManagerImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
@@ -452,9 +451,13 @@
       session.close();
    }
 
-
    protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type,  int node)
    {
+      setUpGroupHandler(type, node, 5000);
+   }
+
+   protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type,  int node, int timeout)
+   {
       GroupingHandler groupingHandler;
       if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
       {
@@ -462,11 +465,16 @@
       }
       else
       {
-         groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
+         groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), timeout);
       }
       this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
    }
 
+   protected void setUpGroupHandler(GroupingHandler groupingHandler,  int node)
+   {
+      this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+   }
+
    protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
    {
       sendInRange(node, address, 0, numMessages, durable, filterVal);

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 12:23:12 UTC (rev 8108)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 12:59:50 UTC (rev 8109)
@@ -14,7 +14,11 @@
 
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.management.Notification;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -79,6 +83,100 @@
       }
    }
 
+   public void testGroupingTimeout() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupHandler(new GroupingHandler()
+         {
+            public SimpleString getName()
+            {
+               return null;
+            }
+
+            public Response propose(Proposal proposal) throws Exception
+            {
+               return null;
+            }
+
+            public void proposed(Response response) throws Exception
+            {
+
+            }
+
+            public void send(Response response, int distance) throws Exception
+            {
+
+            }
+
+            public Response receive(Proposal proposal, int distance) throws Exception
+            {
+               return null;
+            }
+
+            public void onNotification(Notification notification)
+            {
+               
+            }
+         }, 0);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+         try
+         {
+            sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+            fail("should timeout");
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+         }
+
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
    public void testGroupingSendTo2queues() throws Exception
    {
       setupServer(0, isFileStorage(), isNetty());



More information about the hornetq-commits mailing list