[hornetq-commits] JBoss hornetq SVN: r10283 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/postoffice/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Mar 2 23:52:51 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-02 23:52:51 -0500 (Wed, 02 Mar 2011)
New Revision: 10283

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6009 - duplicate cache detection changes between on redistribution

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -52,6 +52,9 @@
    private static final Logger log = Logger.getLogger(MessageImpl.class);
 
    public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
+   
+   // used by the bridges to set duplicates 
+   public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP");
 
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
    

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -23,6 +23,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
@@ -32,6 +33,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.StorageManager;
@@ -78,6 +80,8 @@
 
    public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
 
+   private static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
+
    private final AddressManager addressManager;
 
    private final QueueFactory queueFactory;
@@ -466,9 +470,9 @@
 
    public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
    {
-      
+
       addressSettingsRepository.clearCache();
-      
+
       Binding binding = addressManager.removeBinding(uniqueName);
 
       if (binding == null)
@@ -491,7 +495,7 @@
       {
          managementService.unregisterDivert(uniqueName);
       }
-      
+
       if (binding.getType() != BindingType.DIVERT)
       {
          TypedProperties props = new TypedProperties();
@@ -544,7 +548,10 @@
       route(message, new RoutingContextImpl(tx), direct);
    }
 
-   public void route(final ServerMessage message, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception
+   public void route(final ServerMessage message,
+                     final Transaction tx,
+                     final boolean direct,
+                     final boolean rejectDuplicates) throws Exception
    {
       route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
    }
@@ -554,7 +561,10 @@
       route(message, context, direct, true);
    }
 
-   public void route(final ServerMessage message, final RoutingContext context, final boolean direct, final boolean rejectDuplicates) throws Exception
+   public void route(final ServerMessage message,
+                     final RoutingContext context,
+                     final boolean direct,
+                     boolean rejectDuplicates) throws Exception
    {
       // Sanity check
       if (message.getRefCount() > 0)
@@ -566,55 +576,13 @@
 
       setPagingStore(message);
 
-      byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+      AtomicBoolean startedTX = new AtomicBoolean(false);
 
-      DuplicateIDCache cache = null;
-      
-      boolean isDuplicate = false;
-      
-      if (duplicateIDBytes != null)
+      if (!checkDuplicateID(message, context, rejectDuplicates, startedTX))
       {
-         cache = getDuplicateIDCache(message.getAddress());
-         
-         isDuplicate = cache.contains(duplicateIDBytes);
-
-         if (rejectDuplicates && isDuplicate)
-         {
-            StringBuffer warnMessage = new StringBuffer();
-            warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
-            for (SimpleString key : message.getPropertyNames())
-            {
-               warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
-            }
-            PostOfficeImpl.log.warn(warnMessage.toString());
-
-            if (context.getTransaction() != null) 
-            {
-               context.getTransaction().markAsRollbackOnly(null);
-            }
-
-            return;
-         }
+         return;
       }
 
-      boolean startedTx = false;
-
-      if (cache != null && !isDuplicate)
-      {
-         if (context.getTransaction() == null)
-         {
-            // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
-
-            Transaction newTX = new TransactionImpl(storageManager);
-
-            context.setTransaction(newTX);
-
-            startedTx = true;
-         }
-
-         cache.addToCache(duplicateIDBytes, context.getTransaction());
-      }
-
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
 
       if (bindings != null)
@@ -657,7 +625,7 @@
          processRoute(message, context, direct);
       }
 
-      if (startedTx)
+      if (startedTX.get())
       {
          context.getTransaction().commit();
       }
@@ -842,20 +810,20 @@
          processRoute(message, context, false);
       }
    }
-   
-   
+
    private class PageDelivery extends TransactionOperationAbstract
    {
       private Set<Queue> queues = new HashSet<Queue>();
-      
+
       public void addQueues(List<Queue> queueList)
       {
          queues.addAll(queueList);
       }
-      
+
       public void afterCommit(Transaction tx)
       {
-         // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is going towards the queues
+         // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is
+         // going towards the queues
          // The queue will try to depage case it's empty
          for (Queue queue : queues)
          {
@@ -870,7 +838,7 @@
       {
          return Collections.emptyList();
       }
-      
+
    }
 
    private void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
@@ -878,57 +846,56 @@
       final List<MessageReference> refs = new ArrayList<MessageReference>();
 
       Transaction tx = context.getTransaction();
-      
-      
-      for (Map.Entry<SimpleString, RouteContextList> entry: context.getContexListing().entrySet())
+
+      for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet())
       {
          PagingStore store = pagingManager.getPageStore(entry.getKey());
-         
+
          if (store.page(message, context, entry.getValue()))
          {
-            
+
             // We need to kick delivery so the Queues may check for the cursors case they are empty
             schedulePageDelivery(tx, entry);
             continue;
          }
-   
+
          for (Queue queue : entry.getValue().getNonDurableQueues())
          {
             MessageReference reference = message.createReference(queue);
-   
+
             refs.add(reference);
-   
+
             if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
             {
                Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-   
+
                reference.setScheduledDeliveryTime(scheduledDeliveryTime);
             }
-   
+
             message.incrementRefCount();
          }
-   
+
          Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
-   
+
          while (iter.hasNext())
          {
             Queue queue = iter.next();
-   
+
             MessageReference reference = message.createReference(queue);
-   
+
             refs.add(reference);
-   
+
             if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
             {
                Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-   
+
                reference.setScheduledDeliveryTime(scheduledDeliveryTime);
             }
-   
+
             if (message.isDurable())
             {
                int durableRefCount = message.incrementDurableRefCount();
-   
+
                if (durableRefCount == 1)
                {
                   if (tx != null)
@@ -940,18 +907,18 @@
                      storageManager.storeMessage(message);
                   }
                }
-   
+
                if (tx != null)
                {
                   storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
-   
+
                   tx.setContainsPersistent();
                }
                else
                {
                   storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
                }
-   
+
                if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
                {
                   if (tx != null)
@@ -964,11 +931,11 @@
                   }
                }
             }
-   
+
             message.incrementRefCount();
          }
       }
-      
+
       if (tx != null)
       {
          tx.addOperation(new AddOperation(refs));
@@ -1010,7 +977,7 @@
             tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
             tx.addOperation(delivery);
          }
-         
+
          delivery.addQueues(entry.getValue().getDurableQueues());
          delivery.addQueues(entry.getValue().getNonDurableQueues());
       }
@@ -1019,19 +986,19 @@
 
          List<Queue> durableQueues = entry.getValue().getDurableQueues();
          List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
-         
+
          final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
-         
+
          queues.addAll(durableQueues);
          queues.addAll(nonDurableQueues);
 
          storageManager.afterCompleteOperations(new IOAsyncTask()
          {
-            
+
             public void onError(int errorCode, String errorMessage)
             {
             }
-            
+
             public void done()
             {
                for (Queue queue : queues)
@@ -1044,6 +1011,94 @@
       }
    }
 
+   private boolean checkDuplicateID(final ServerMessage message,
+                                    final RoutingContext context,
+                                    boolean rejectDuplicates,
+                                    AtomicBoolean startedTX) throws Exception
+   {
+      // Check the DuplicateCache for the Bridge first
+
+      Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+      if (bridgeDup != null)
+      {
+         // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
+         byte[] bridgeDupBytes = (byte[])bridgeDup;
+
+         DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress()));
+
+         if (cacheBridge.contains(bridgeDupBytes))
+         {
+            StringBuffer warnMessage = new StringBuffer();
+            warnMessage.append("Duplicate message detected through the bridge - message will not be routed. Message information:\n");
+            for (SimpleString key : message.getPropertyNames())
+            {
+               warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
+            }
+            PostOfficeImpl.log.warn(warnMessage.toString());
+
+            return false;
+         }
+         else
+         {
+            if (context.getTransaction() == null)
+            {
+               context.setTransaction(new TransactionImpl(storageManager));
+               startedTX.set(true);
+            }
+         }
+
+         cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
+
+         message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+      }
+
+      byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+
+      DuplicateIDCache cache = null;
+
+      boolean isDuplicate = false;
+
+      if (duplicateIDBytes != null)
+      {
+         cache = getDuplicateIDCache(message.getAddress());
+
+         isDuplicate = cache.contains(duplicateIDBytes);
+
+         if (rejectDuplicates && isDuplicate)
+         {
+            StringBuffer warnMessage = new StringBuffer();
+            warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
+            for (SimpleString key : message.getPropertyNames())
+            {
+               warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
+            }
+            PostOfficeImpl.log.warn(warnMessage.toString());
+
+            if (context.getTransaction() != null)
+            {
+               context.getTransaction().markAsRollbackOnly(null);
+            }
+
+            return false;
+         }
+      }
+
+      if (cache != null && !isDuplicate)
+      {
+         if (context.getTransaction() == null)
+         {
+            // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+            context.setTransaction(new TransactionImpl(storageManager));
+
+            startedTX.set(true);
+         }
+
+         cache.addToCache(duplicateIDBytes, context.getTransaction());
+      }
+
+      return true;
+   }
+
    /**
     * @param refs
     */
@@ -1211,7 +1266,7 @@
             message.decrementRefCount();
          }
       }
-      
+
       public List<MessageReference> getRelatedMessageReferences()
       {
          return refs;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -55,6 +55,11 @@
  */
 public interface HornetQServer extends HornetQComponent
 {
+   
+   void setIdentity(String identity);
+   
+   String getIdentity();
+   
    Configuration getConfiguration();
 
    RemotingService getRemotingService();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -32,6 +32,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.MessageReference;
@@ -336,7 +337,7 @@
 
          bb.putLong(message.getMessageID());
 
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
+         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
       }
 
       if (transformer != null)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -219,6 +219,9 @@
    private volatile GroupingHandler groupingHandler;
    
    private NodeManager nodeManager;
+   
+   // Used to identify the server on tests... useful on debugging testcases
+   private String identity;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -779,6 +782,16 @@
    // -----------------------------------------------------------
 
    
+   public void setIdentity(String identity)
+   {
+      this.identity = identity;
+   }
+   
+   public String getIdentity()
+   {
+      return identity;
+   }
+   
    public ScheduledExecutorService getScheduledPool()
    {
       return scheduledPool;
@@ -1730,8 +1743,6 @@
                                      transformer,
                                      postOffice,
                                      storageManager);
-      // pagingManager,
-      // storageManager);
 
       Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
 
@@ -1864,6 +1875,11 @@
          }
       }
    }
+   
+   public String toString()
+   {
+      return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
+   }
 
    // Inner classes
    // --------------------------------------------------------------------------------

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -748,7 +748,8 @@
          {
             
             ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
+            
+            
             if (message == null)
             {
                ClusterTestBase.log.info("*** dumping consumers:");
@@ -758,6 +759,13 @@
                Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
             }
 
+            Set<SimpleString> names = message.getPropertyNames();
+            for (SimpleString name : names)
+            {
+               assertFalse("Property starting with _HQ_ROUTE_TO what could be dangerous on resending it", name.toString().startsWith("_HQ_ROUTE_TO"));
+            }
+
+
             if (ack)
             {
                message.acknowledge();
@@ -1769,9 +1777,10 @@
    {
       for (int node : nodes)
       {
-         ClusterTestBase.log.info("starting server " + node);
-
+         servers[node].setIdentity("server " + node);
+         ClusterTestBase.log.info("starting server " + servers[node]);
          servers[node].start();
+         ClusterTestBase.log.info("started server " + servers[node]);
 
          ClusterTestBase.log.info("started server " + node);
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-03-03 04:52:51 UTC (rev 10283)
@@ -104,6 +104,7 @@
       MessageRedistributionTest.log.info("Test done");
    }
 
+
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
       setupCluster(false);
@@ -445,6 +446,73 @@
 
    }
 
+   public void testBackAndForth2() throws Exception
+   {
+      for (int i = 0; i < 10; i++)
+      {
+         setupCluster(false);
+
+         startServers(0, 1);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+
+         final String ADDRESS = "queues.testaddress";
+         final String QUEUE = "queue0";
+
+         createQueue(0, ADDRESS, QUEUE, null, false);
+         createQueue(1, ADDRESS, QUEUE, null, false);
+
+         addConsumer(0, 0, QUEUE, null);
+
+         waitForBindings(0, ADDRESS, 1, 1, true);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+
+         waitForBindings(0, ADDRESS, 1, 0, false);
+         waitForBindings(1, ADDRESS, 1, 1, false);
+
+         send(1, ADDRESS, 20, false, null);
+
+         waitForMessages(0, ADDRESS, 20);
+
+         removeConsumer(0);
+
+         waitForBindings(0, ADDRESS, 1, 0, true);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+
+         waitForBindings(0, ADDRESS, 1, 0, false);
+         waitForBindings(1, ADDRESS, 1, 0, false);
+
+         addConsumer(1, 1, QUEUE, null);
+
+         waitForMessages(1, ADDRESS, 20);
+         waitForMessages(0, ADDRESS, 0);
+
+         waitForBindings(0, ADDRESS, 1, 1, false);
+         waitForBindings(1, ADDRESS, 1, 0, false);
+
+         removeConsumer(1);
+
+         addConsumer(0, 0, QUEUE, null);
+         
+         waitForMessages(1, ADDRESS, 0);
+         waitForMessages(0, ADDRESS, 20);
+         
+         removeConsumer(0);
+         addConsumer(1, 1, QUEUE, null);
+         
+         waitForMessages(1, ADDRESS, 20);
+         waitForMessages(0, ADDRESS, 0);
+         
+         
+         verifyReceiveAll(20, 1);
+
+         stop();
+         start();
+      }
+
+   }
+
    public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
    {
       setupCluster(false);



More information about the hornetq-commits mailing list