[hornetq-commits] JBoss hornetq SVN: r10752 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat May 28 12:34:42 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-28 12:34:41 -0400 (Sat, 28 May 2011)
New Revision: 10752

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
Log:
https://issues.jboss.org/browse/HORNETQ-705 - fixing queue iterations with multiple priorities

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -44,8 +44,6 @@
 
    void activate();
 
-   void setQueue(Queue queue);
-
    void setNotificationService(NotificationService notificationService);
 
    RemotingConnection getForwardingConnection();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -64,6 +64,8 @@
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(BridgeImpl.class);
+   
+   private static final boolean isTrace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
    
@@ -77,7 +79,7 @@
 
    private final SimpleString name;
 
-   private Queue queue;
+   private final Queue queue;
 
    protected final Executor executor;
 
@@ -222,6 +224,8 @@
          }
       }
       
+      log.info("Bridge " + this.name + " being stopped");
+      
       stopping = true;
 
       executor.execute(new StopRunnable());
@@ -266,11 +270,6 @@
       return queue;
    }
 
-   public void setQueue(final Queue queue)
-   {
-      this.queue = queue;
-   }
-
    public Filter getFilter()
    {
       return filter;
@@ -367,20 +366,25 @@
       {
          return HandleStatus.NO_MATCH;
       }
-
+      
       synchronized (this)
       {
          if (!active)
          {
+            log.debug(name + "::Ignoring reference on bridge as it is set to iniactive ref=" + ref);
             return HandleStatus.BUSY;
          }
 
+		   if (isTrace)
+		   {
+		      log.trace("Bridge " + name + " is handling reference=" + ref); 
+		   }
          ref.handled();
 
          ServerMessage message = ref.getMessage();
 
          refs.add(ref);
-
+         
          message = beforeForward(message);
 
          SimpleString dest;
@@ -419,11 +423,13 @@
 
    public void connectionFailed(final HornetQException me, boolean failedOver)
    {
+      log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
       fail(false);
    }
 
    public void beforeReconnect(final HornetQException exception)
    {
+      log.warn(name + "::Connection failed before reconnect ", exception);
       fail(true);
    }
 
@@ -454,8 +460,11 @@
       // we want to cancel all unacked refs so they get resent
       // duplicate detection will ensure no dups are routed on the other side
 
+      log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" + beforeReconnect);
+      
       if (session.getConnection().isDestroyed())
       {
+         log.debug(name + "::Connection is destroyed, active = false now");
          active = false;
       }
 
@@ -467,7 +476,7 @@
             {
                synchronized (this)
                {
-                  active = false;
+                  log.debug(name + "::Connection is destroyed, active = false now");
                }
 
                cancelRefs();
@@ -476,6 +485,7 @@
             {
                afterConnect();
 
+               log.debug(name + "::After reconnect, setting active=true now");
                active = true;
 
                if (queue != null)
@@ -650,6 +660,8 @@
                {
                   return;
                }
+               
+               log.debug("Closing Session for bridge " + BridgeImpl.this.name);
 
                if (session != null)
                {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -16,8 +16,12 @@
 import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
 import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -29,7 +33,6 @@
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -272,6 +275,8 @@
       {
          serverLocator.removeClusterTopologyListener(this);
       }
+      
+      log.debug("Cluster connection being stopped for node" + nodeUUID);
 
       synchronized (this)
       {
@@ -357,6 +362,8 @@
          serverLocator.setBackup(server.getConfiguration().isBackup());
          serverLocator.setInitialConnectAttempts(-1);
          serverLocator.setConfirmationWindowSize(0);
+         serverLocator.setBlockOnDurableSend(false);
+         serverLocator.setBlockOnNonDurableSend(false);
 
          if(retryInterval > 0)
          {
@@ -388,6 +395,7 @@
 
    public synchronized void nodeDown(final String nodeID)
    {
+      log.debug("node " + nodeID + " being considered down on cluster connection for nodeID=" + nodeUUID);
       if (nodeID.equals(nodeUUID.toString()))
       {
          return;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -22,7 +22,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 
-
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
@@ -93,7 +92,7 @@
    // regular client listeners to be notified of cluster topology changes.
    // they correspond to regular clients using a HA ServerLocator
    private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-   
+
    // cluster connections listeners to be notified of cluster topology changes
    // they correspond to cluster connections on *other nodes connected to this one*
    private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
@@ -103,6 +102,7 @@
    private volatile ServerLocatorInternal backupServerLocator;
 
    private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
+
    private Executor executor;
 
    public ClusterManagerImpl(final ExecutorFactory executorFactory,
@@ -173,11 +173,10 @@
       {
          announceNode();
       }
-      
+
       started = true;
    }
 
-
    public synchronized void stop() throws Exception
    {
       if (!started)
@@ -200,7 +199,7 @@
             clusterConnection.stop();
             managementService.unregisterCluster(clusterConnection.getName().toString());
          }
-         
+
          clusterConnectionListeners.clear();
          clientListeners.clear();
          clusterConnections.clear();
@@ -216,7 +215,7 @@
 
       bridges.clear();
 
-      if(backupServerLocator != null)
+      if (backupServerLocator != null)
       {
          backupServerLocator.close();
          backupServerLocator = null;
@@ -238,7 +237,7 @@
       }
 
       boolean removed = topology.removeMember(nodeID);
-      
+
       if (removed)
       {
 
@@ -262,23 +261,23 @@
       TopologyMember member = new TopologyMember(connectorPair);
       boolean updated = topology.addMember(nodeID, member);
 
-      if(!updated)
+      if (!updated)
       {
          return;
       }
-      
+
       for (ClusterTopologyListener listener : clientListeners)
       {
          listener.nodeUP(nodeID, member.getConnector(), last);
       }
 
-
       for (ClusterTopologyListener listener : clusterConnectionListeners)
       {
          listener.nodeUP(nodeID, member.getConnector(), last);
       }
 
-      //if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster connections.
+      // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
+      // connections.
       if (nodeAnnounce)
       {
          for (ClusterConnection clusterConnection : clusterConnections.values())
@@ -287,7 +286,7 @@
          }
       }
    }
-   
+
    public boolean isStarted()
    {
       return started;
@@ -313,8 +312,7 @@
       return clusterConnections.get(name.toString());
    }
 
-   public void addClusterTopologyListener(final ClusterTopologyListener listener,
-                                                     final boolean clusterConnection)
+   public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
    {
       synchronized (this)
       {
@@ -333,7 +331,7 @@
    }
 
    public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
-                                                       final boolean clusterConnection)
+                                                          final boolean clusterConnection)
    {
       if (clusterConnection)
       {
@@ -349,9 +347,9 @@
    {
       return topology;
    }
-   
+
    // backup node becomes live
-  public synchronized void activate()
+   public synchronized void activate()
    {
       if (backup)
       {
@@ -360,7 +358,7 @@
          String nodeID = server.getNodeID().toString();
 
          TopologyMember member = topology.getMember(nodeID);
-         //we swap the topology backup now = live
+         // we swap the topology backup now = live
          if (member != null)
          {
             member.getConnector().a = member.getConnector().b;
@@ -368,9 +366,9 @@
             member.getConnector().b = null;
          }
 
-         if(backupServerLocator != null)
+         if (backupServerLocator != null)
          {
-            //todo we could use the topology of this to preempt it arriving from the cc
+            // todo we could use the topology of this to preempt it arriving from the cc
             try
             {
                backupServerLocator.close();
@@ -434,7 +432,7 @@
    public void announceBackup() throws Exception
    {
       List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
-      if(!configs.isEmpty())
+      if (!configs.isEmpty())
       {
          ClusterConnectionConfiguration config = configs.get(0);
 
@@ -442,8 +440,7 @@
 
          if (connector == null)
          {
-            log.warn("No connecor with name '" + config.getConnectorName() +
-                     "'. backup cannot be announced.");
+            log.warn("No connecor with name '" + config.getConnectorName() + "'. backup cannot be announced.");
             return;
          }
          announceBackup(config, connector);
@@ -469,11 +466,13 @@
       {
          if (backup)
          {
-            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()));
+            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null,
+                                                                                                 cc.getConnector()));
          }
          else
          {
-            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null));
+            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(),
+                                                                                                 null));
          }
 
          topology.addMember(nodeID, member);
@@ -482,11 +481,11 @@
       {
          if (backup)
          {
-           // pair.b = cc.getConnector();
+            // pair.b = cc.getConnector();
          }
          else
          {
-           // pair.a = cc.getConnector();
+            // pair.a = cc.getConnector();
          }
       }
 
@@ -496,7 +495,7 @@
       {
          listener.nodeUP(nodeID, member.getConnector(), false);
       }
-      
+
       for (ClusterTopologyListener listener : clusterConnectionListeners)
       {
          listener.nodeUP(nodeID, member.getConnector(), false);
@@ -685,6 +684,8 @@
       serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
       serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
       serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
+      serverLocator.setBlockOnDurableSend(false);
+      serverLocator.setBlockOnNonDurableSend(false);
       clusterLocators.add(serverLocator);
       Bridge bridge = new BridgeImpl(serverLocator,
                                      nodeUUID,
@@ -720,7 +721,7 @@
          managementService.unregisterBridge(name);
       }
    }
-   
+
    private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
    {
       if (config.getName() == null)
@@ -746,10 +747,10 @@
          return;
       }
 
-
-      if(clusterConnections.containsKey(config.getName()))
+      if (clusterConnections.containsKey(config.getName()))
       {
-         log.warn("Cluster Configuration  '" + config.getConnectorName() + "' already exists. The cluster connection will not be deployed.", new Exception ("trace"));
+         log.warn("Cluster Configuration  '" + config.getConnectorName() +
+                  "' already exists. The cluster connection will not be deployed.", new Exception("trace"));
          return;
       }
 
@@ -788,7 +789,8 @@
       }
       else
       {
-         TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
+         TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
+                                                                                  : null;
 
          clusterConnection = new ClusterConnectionImpl(tcConfigs,
                                                        connector,
@@ -816,8 +818,8 @@
       clusterConnections.put(config.getName(), clusterConnection);
 
       clusterConnection.start();
-      
-      if(backup)
+
+      if (backup)
       {
          announceBackup(config, connector);
       }
@@ -860,13 +862,15 @@
                ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
                if (backupSessionFactory != null)
                {
-                  backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+                  backupSessionFactory.getConnection()
+                                      .getChannel(0, -1)
+                                      .send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
                   log.info("backup announced");
                }
             }
             catch (Exception e)
             {
-               log.warn("Unable to announce backup", e); 
+               log.warn("Unable to announce backup", e);
             }
          }
       });
@@ -892,7 +896,8 @@
       }
       return transformer;
    }
-   //for testing
+
+   // for testing
    public void clear()
    {
       bridges.clear();
@@ -904,7 +909,7 @@
          }
          catch (Exception e)
          {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            e.printStackTrace();
          }
       }
       clusterConnections.clear();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -393,7 +393,7 @@
       {
          return;
       }
-
+      
       queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
 
       concurrentQueue.add(ref);
@@ -2055,7 +2055,7 @@
       }
       catch (Exception e)
       {
-         QueueImpl.log.warn("Unable to decrement reference counting", e);
+	      QueueImpl.log.warn("Unable to decrement reference counting", e);
       }
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -43,6 +43,8 @@
 
    private int highestPriority = -1;
 
+   private int lastPriority = -1;
+
    public PriorityLinkedListImpl(final int priorities)
    {
       this.priorities = priorities;
@@ -54,14 +56,25 @@
          levels[i] = new LinkedListImpl<T>();
       }
    }
-   
-   private void checkHighest(int priority)
+
+   private void checkHighest(final int priority)
    {
+      if (lastPriority != priority || priority > highestPriority)
+      {
+         lastPriority = priority;
+         if (lastReset == Integer.MAX_VALUE)
+         {
+            lastReset = 0;
+         }
+         else
+         {
+            lastReset++;
+         }
+      }
+
       if (priority > highestPriority)
       {
          highestPriority = priority;
-         
-         lastReset++;
       }
    }
 
@@ -150,19 +163,20 @@
    {
       private int index;
 
-      private LinkedListIterator<T>[] cachedIters = new LinkedListIterator[levels.length];
+      private final LinkedListIterator<T>[] cachedIters = new LinkedListIterator[levels.length];
 
       private LinkedListIterator<T> lastIter;
-      
+
       private int resetCount = lastReset;
-      
+
       volatile boolean closed = false;
 
       PriorityLinkedListIterator()
       {
          index = levels.length - 1;
       }
-      
+
+      @Override
       protected void finalize()
       {
          close();
@@ -184,7 +198,7 @@
          {
             closed = true;
             lastIter = null;
-   
+
             for (LinkedListIterator<T> iter : cachedIters)
             {
                if (iter != null)
@@ -194,13 +208,13 @@
             }
          }
       }
-      
+
       private void checkReset()
       {
-         if (lastReset > resetCount)
+         if (lastReset != resetCount)
          {
             index = highestPriority;
-            
+
             resetCount = lastReset;
          }
       }
@@ -208,7 +222,7 @@
       public boolean hasNext()
       {
          checkReset();
-         
+
          while (index >= 0)
          {
             lastIter = cachedIters[index];
@@ -255,10 +269,17 @@
          }
 
          lastIter.remove();
-         
-         if (index == highestPriority && levels[index].size() == 0)
+
+         // This next statement would be the equivalent of:
+         // if (index == highestPriority && levels[index].size() == 0)
+         // However we have to keep checking all the previous levels
+         // otherwise we would cache a max that will not exist
+         // what would make us eventually having hasNext() returning false 
+         // as a bug
+         // Part of the fix for HORNETQ-705
+         for (int i = index; i >= 0 && levels[index].size() == 0; i--)
          {
-            highestPriority--;
+            highestPriority = i;
          }
 
          size--;

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -17,6 +17,9 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -42,6 +45,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.impl.BridgeImpl;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.LinkedListIterator;
@@ -947,25 +951,257 @@
 
          try
          {
-             server0.stop();
+            server0.stop();
          }
-         catch(Exception ignored)
+         catch (Exception ignored)
          {
-            
+
          }
 
          try
          {
-             server1.stop();
+            server1.stop();
          }
-         catch(Exception ignored)
+         catch (Exception ignored)
          {
-            
+
          }
       }
 
    }
 
+   public void testSawtoothLoad() throws Exception
+   {
+      Map<String, Object> server0Params = new HashMap<String, Object>();
+      HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+      server0.getConfiguration().setThreadPoolMaxSize(10);
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      addTargetParameters(server1Params);
+      HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+      server1.getConfiguration().setThreadPoolMaxSize(10);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      final TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+      final TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+      connectors.put(server1tc.getName(), server1tc);
+
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+
+      ArrayList<String> staticConnectors = new ArrayList<String>();
+      staticConnectors.add(server1tc.getName());
+
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+                                                                        queueName0,
+                                                                        forwardAddress,
+                                                                        null,
+                                                                        null,
+                                                                        1000,
+                                                                        1d,
+                                                                        -1,
+                                                                        false,
+                                                                        0,
+                                                                        HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                        staticConnectors,
+                                                                        false,
+                                                                        ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                        ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+      try
+      {
+         server1.start();
+         server0.start();
+
+         final int numMessages = 10000;
+
+         final int totalrepeats = 10;
+
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         // We shouldn't have more than 10K messages pending
+         final Semaphore semop = new Semaphore(10000);
+
+         class ConsumerThread extends Thread
+         {
+            public void run()
+            {
+               try
+               {
+                  ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server1tc);
+
+                  ClientSessionFactory sf = locator.createSessionFactory();
+
+                  ClientSession session = sf.createSession(false, false);
+
+                  session.start();
+
+                  ClientConsumer consumer = session.createConsumer(queueName1);
+
+                  for (int i = 0; i < numMessages; i++)
+                  {
+                     ClientMessage message = consumer.receive(5000);
+
+                     Assert.assertNotNull(message);
+
+                     message.acknowledge();
+                     semop.release();
+                     if (i % 1000 == 0)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  session.commit();
+
+                  session.close();
+                  sf.close();
+                  locator.close();
+
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         };
+
+         class ProducerThread extends Thread
+         {
+            final int nmsg;
+            ProducerThread(int nmsg)
+            {
+               this.nmsg = nmsg;
+            }
+            public void run()
+            {
+               ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server0tc);
+               
+               locator.setBlockOnDurableSend(false);
+               locator.setBlockOnNonDurableSend(false);
+
+               ClientSessionFactory sf = null;
+
+               ClientSession session = null;
+
+               ClientProducer producer = null;
+
+               try
+               {
+                  sf = locator.createSessionFactory();
+
+                  session = sf.createSession(false, true, true);
+
+                  producer = session.createProducer(new SimpleString(testAddress));
+
+                  for (int i = 0; i < nmsg; i++)
+                  {
+                     assertEquals(0, errors.get());
+                     ClientMessage message = session.createMessage(true);
+
+                     message.putIntProperty("seq", i);
+                     
+                     
+                     if (i % 100 == 0)
+                     {
+                        message.setPriority((byte)(RandomUtil.randomPositiveInt() % 9));
+                     }
+                     else
+                     {
+                        message.setPriority((byte)5);
+                     }
+
+                     message.getBodyBuffer().writeBytes(new byte[50]);
+
+                     producer.send(message);
+                     assertTrue(semop.tryAcquire(1, 10, TimeUnit.SECONDS));
+                  }
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace(System.out);
+                  errors.incrementAndGet();
+               }
+               finally
+               {
+                  try
+                  {
+                     session.close();
+                     sf.close();
+                     locator.close();
+                  }
+                  catch (Exception ignored)
+                  {
+                     errors.incrementAndGet();
+                  }
+               }
+            }
+         }
+
+         for (int repeat = 0 ; repeat < totalrepeats; repeat++)
+         {
+            System.out.println("Repeat " + repeat);
+            ArrayList<Thread> threads = new ArrayList<Thread>();
+   
+            threads.add(new ConsumerThread());
+            threads.add(new ProducerThread(numMessages / 2));
+            threads.add(new ProducerThread(numMessages / 2));
+
+            for (Thread t : threads)
+            {
+               t.start();
+            }
+   
+            for (Thread t : threads)
+            {
+               t.join();
+            }
+   
+            assertEquals(0, errors.get());
+         }
+      }
+      finally
+      {
+         try
+         {
+            server0.stop();
+         }
+         catch (Exception ignored)
+         {
+
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Exception ignored)
+         {
+
+         }
+      }
+
+   }
+
    public void testBridgeWithPaging() throws Exception
    {
       HornetQServer server0 = null;
@@ -1142,11 +1378,11 @@
          ArrayList<String> staticConnectors = new ArrayList<String>();
          staticConnectors.add(server1tc.getName());
          BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
-                                                                                                        // forwarding
-                                                                                                        // address to
-                                                                                                        // use messages'
-                                                                                                        // original
-                                                                                                        // address
+                                                                           // forwarding
+                                                                           // address to
+                                                                           // use messages'
+                                                                           // original
+                                                                           // address
                                                                            null,
                                                                            null,
                                                                            1000,

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -513,7 +513,7 @@
          throw new IllegalArgumentException("No sf at " + node);
       }
 
-      ClientSession session = sf.createSession(false, true, true);
+      ClientSession session = sf.createSession(false, false, false);
 
       try
       {
@@ -531,7 +531,14 @@
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
 
             producer.send(message);
+            
+            if (i % 100 == 0)
+            {
+               session.commit();
+            }
          }
+         
+         session.commit();
       }
       finally
       {
@@ -1328,6 +1335,7 @@
          configuration.setJournalFileSize(100 * 1024);
          configuration.setJournalType(getDefaultJournalType());
          configuration.setSharedStore(sharedStorage);
+         configuration.setThreadPoolMaxSize(10);
          if (sharedStorage)
          {
             // Shared storage will share the node between the backup and live node

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -210,6 +210,56 @@
       verifyNotReceive(0, 1, 2, 3, 4);
    }
 
+
+   public void testBasicRoundRobinManyMessages() throws Exception
+   {
+      setupCluster();
+
+      startServers();
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      createQueue(2, "queues.testaddress", "queue0", null, false);
+      createQueue(3, "queues.testaddress", "queue0", null, false);
+      createQueue(4, "queues.testaddress", "queue0", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+      addConsumer(3, 3, "queue0", null);
+      addConsumer(4, 4, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+      System.out.println(clusterDescription(servers[0]));
+      System.out.println(clusterDescription(servers[1]));
+      System.out.println(clusterDescription(servers[2]));
+      System.out.println(clusterDescription(servers[3]));
+      System.out.println(clusterDescription(servers[4]));
+
+      waitForBindings(0, "queues.testaddress", 4, 4, false);
+      waitForBindings(1, "queues.testaddress", 4, 4, false);
+      waitForBindings(2, "queues.testaddress", 4, 4, false);
+      waitForBindings(3, "queues.testaddress", 4, 4, false);
+      waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+      send(0, "queues.testaddress", 1000, true, null);
+
+      verifyReceiveRoundRobinInSomeOrder(1000, 0, 1, 2, 3, 4);
+
+      verifyNotReceive(0, 1, 2, 3, 4);
+   }
+
    public void testRoundRobinMultipleQueues() throws Exception
    {
       SymmetricClusterTest.log.info("starting");

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java	2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java	2011-05-28 16:34:41 UTC (rev 10752)
@@ -16,6 +16,7 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.utils.LinkedListIterator;
 import org.hornetq.utils.PriorityLinkedListImpl;
 
@@ -77,9 +78,9 @@
    protected Wibble y;
 
    protected Wibble z;
-   
+
    private PriorityLinkedListImpl<Wibble> list;
-   
+
    protected abstract PriorityLinkedListImpl<Wibble> getList();
 
    public void setUp() throws Exception
@@ -133,7 +134,7 @@
       Wibble w = list.poll();
       Assert.assertEquals(a, w);
       Assert.assertTrue(list.isEmpty());
-      
+
       assertEquals(0, list.size());
    }
 
@@ -144,7 +145,7 @@
       list.addHead(c, 0);
       list.addHead(d, 0);
       list.addHead(e, 0);
-      
+
       assertEquals(5, list.size());
 
       Assert.assertEquals(e, list.poll());
@@ -153,7 +154,7 @@
       Assert.assertEquals(b, list.poll());
       Assert.assertEquals(a, list.poll());
       Assert.assertNull(list.poll());
-      
+
       assertEquals(0, list.size());
    }
 
@@ -172,11 +173,11 @@
       Assert.assertEquals(d, list.poll());
       Assert.assertEquals(e, list.poll());
       Assert.assertNull(list.poll());
-      
+
       assertEquals(0, list.size());
 
    }
-   
+
    public void testAddLastAndFirst() throws Exception
    {
       list.addTail(a, 0);
@@ -189,7 +190,7 @@
       list.addTail(h, 0);
       list.addTail(i, 0);
       list.addTail(j, 0);
-      
+
       list.addHead(k, 0);
       list.addHead(l, 0);
       list.addHead(m, 0);
@@ -200,7 +201,7 @@
       list.addHead(r, 0);
       list.addHead(s, 0);
       list.addHead(t, 0);
-      
+
       assertEquals(t, list.poll());
       assertEquals(s, list.poll());
       assertEquals(r, list.poll());
@@ -211,7 +212,7 @@
       assertEquals(m, list.poll());
       assertEquals(l, list.poll());
       assertEquals(k, list.poll());
-      
+
       assertEquals(a, list.poll());
       assertEquals(b, list.poll());
       assertEquals(c, list.poll());
@@ -223,7 +224,7 @@
       assertEquals(i, list.poll());
       assertEquals(j, list.poll());
    }
-   
+
    public void testAddLastAndFirstWithIterator() throws Exception
    {
       list.addTail(a, 0);
@@ -236,7 +237,7 @@
       list.addTail(h, 0);
       list.addTail(i, 0);
       list.addTail(j, 0);
-      
+
       list.addHead(k, 0);
       list.addHead(l, 0);
       list.addHead(m, 0);
@@ -247,9 +248,9 @@
       list.addHead(r, 0);
       list.addHead(s, 0);
       list.addHead(t, 0);
-      
+
       LinkedListIterator<Wibble> iter = list.iterator();
-           
+
       assertTrue(iter.hasNext());
       assertEquals(t, iter.next());
       assertTrue(iter.hasNext());
@@ -270,7 +271,7 @@
       assertEquals(l, iter.next());
       assertTrue(iter.hasNext());
       assertEquals(k, iter.next());
- 
+
       assertTrue(iter.hasNext());
       assertEquals(a, iter.next());
       assertTrue(iter.hasNext());
@@ -438,7 +439,7 @@
       Assert.assertEquals(a, list.poll());
 
       Assert.assertNull(list.poll());
-      
+
       assertEquals(0, list.size());
 
    }
@@ -567,8 +568,7 @@
       w = (Wibble)iter.next();
       Assert.assertEquals("z", w.s);
       assertFalse(iter.hasNext());
-      
-      
+
       iter = list.iterator();
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
@@ -617,7 +617,7 @@
       iter.remove();
 
       Assert.assertEquals(23, list.size());
-      
+
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
       Assert.assertEquals("i", w.s);
@@ -671,7 +671,6 @@
       Assert.assertEquals("z", w.s);
       iter.remove();
 
-
       iter = list.iterator();
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
@@ -739,78 +738,76 @@
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
       Assert.assertEquals("y", w.s);
-      
+
       assertFalse(iter.hasNext());
       assertFalse(iter.hasNext());
 
-      //Test the elements added after iter created are seen
-      
+      // Test the elements added after iter created are seen
+
       list.addTail(a, 4);
       list.addTail(b, 4);
-      
+
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
       Assert.assertEquals("a", w.s);
-      
+
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
       Assert.assertEquals("b", w.s);
-      
-      assertFalse(iter.hasNext());      
-      
+
+      assertFalse(iter.hasNext());
+
       list.addTail(c, 4);
       list.addTail(d, 4);
-      
+
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
       Assert.assertEquals("c", w.s);
-      
+
       assertTrue(iter.hasNext());
       w = (Wibble)iter.next();
       Assert.assertEquals("d", w.s);
-      
+
       assertFalse(iter.hasNext());
-      
 
    }
-   
+
    public void testIteratorPicksUpHigherPriorities()
    {
       list.addTail(a, 4);
       list.addTail(b, 4);
       list.addTail(c, 4);
-     
+
       LinkedListIterator<Wibble> iter = list.iterator();
-      
+
       assertTrue(iter.hasNext());
       assertEquals(a, iter.next());
-      
+
       assertTrue(iter.hasNext());
       assertEquals(b, iter.next());
-      
+
       list.addTail(d, 5);
       list.addTail(e, 5);
-      
+
       assertTrue(iter.hasNext());
       assertEquals(d, iter.next());
-      
+
       assertTrue(iter.hasNext());
       assertEquals(e, iter.next());
-      
+
       assertTrue(iter.hasNext());
       assertEquals(c, iter.next());
-      
+
       list.addTail(f, 1);
       list.addTail(g, 9);
-      
+
       assertTrue(iter.hasNext());
       assertEquals(g, iter.next());
-      
+
       assertTrue(iter.hasNext());
       assertEquals(f, iter.next());
    }
 
-
    public void testClear()
    {
       list.addTail(a, 0);
@@ -829,6 +826,59 @@
       Assert.assertNull(list.poll());
    }
 
+   public void testMixupIterator()
+   {
+      list.addTail(c, 5);
+      list.addTail(a, 4);
+      list.addTail(b, 4);
+
+      LinkedListIterator<Wibble> iter = list.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals(c, iter.next());
+      assertTrue(iter.hasNext());
+      assertEquals(a, iter.next());
+      assertTrue(iter.hasNext());
+      assertEquals(b, iter.next());
+      list.addTail(d, 5);
+      assertTrue(iter.hasNext());
+      assertEquals(d, iter.next());
+   }
+
+   public void testMixupIterator2()
+   {
+      list.addTail(c, 5);
+      
+      list.addTail(k, 0);
+
+      list.addTail(a, 2);
+      list.addTail(b, 2);
+
+      LinkedListIterator<Wibble> iter = list.iterator();
+
+      assertTrue(iter.hasNext());
+      assertEquals(c, iter.next());
+      iter.remove();
+      
+      assertTrue(iter.hasNext());
+      assertEquals(a, iter.next());
+      iter.remove();
+      
+      assertTrue(iter.hasNext());
+      assertEquals(b, iter.next());
+      iter.remove();
+      
+      assertTrue(iter.hasNext());
+      assertEquals(k, iter.next());
+      iter.remove();
+      
+      list.addTail(d, 2);
+      
+      assertTrue(iter.hasNext());
+      assertEquals(d, iter.next());
+      iter.remove();
+   }
+
    class Wibble
    {
       String s;



More information about the hornetq-commits mailing list