[hornetq-commits] JBoss hornetq SVN: r10086 - in trunk: src/main/org/hornetq/core/postoffice/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 29 12:43:46 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-29 12:43:44 -0500 (Wed, 29 Dec 2010)
New Revision: 10086

Modified:
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
HORNETQ-598 consider duplicate IDs during move message

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -24,7 +24,6 @@
 import org.hornetq.api.core.PropertyConversionException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
-import org.hornetq.core.client.impl.LargeMessageController;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.protocol.core.impl.PacketImpl;

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -26,17 +26,14 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.AddressManager;
 import org.hornetq.core.postoffice.Binding;
@@ -64,7 +61,6 @@
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUIDGenerator;
@@ -557,25 +553,14 @@
 
       setPagingStore(message);
 
-      Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
+      byte[] duplicateIDBytes = message.getDuplicateIDBytes();
 
       DuplicateIDCache cache = null;
-
-      byte[] duplicateIDBytes = null;
-
-      if (duplicateID != null)
+      
+      if (duplicateIDBytes != null)
       {
          cache = getDuplicateIDCache(message.getAddress());
 
-         if (duplicateID instanceof SimpleString)
-         {
-            duplicateIDBytes = ((SimpleString)duplicateID).getData();
-         }
-         else
-         {
-            duplicateIDBytes = (byte[])duplicateID;
-         }
-
          if (cache.contains(duplicateIDBytes))
          {
             if (context.getTransaction() == null)

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.server;
 
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.paging.PagingStore;
@@ -59,4 +58,8 @@
    boolean storeIsPaging();
 
    void encodeMessageIDToBuffer();
+   
+   byte [] getDuplicateIDBytes();
+   
+   Object getDuplicateProperty();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -36,6 +36,7 @@
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.HandleStatus;
@@ -981,7 +982,15 @@
          {
             iter.remove();
             deliveringCount.incrementAndGet();
-            move(toAddress, ref);
+            try
+            {
+               move(toAddress, ref);
+            }
+            catch (Exception e)
+            {
+               deliveringCount.decrementAndGet();
+               throw e;
+            }
             return true;
          }
       }
@@ -993,32 +1002,69 @@
       Transaction tx = new TransactionImpl(storageManager);
 
       int count = 0;
-      Iterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (filter == null || filter.match(ref.getMessage()))
+         Iterator<MessageReference> iter = iterator();
+         
+         DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
+   
+         while (iter.hasNext())
          {
+            MessageReference ref = iter.next();
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               boolean ignored = false;
+               
+               deliveringCount.incrementAndGet();
+               count++;
+
+               byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+               if (duplicateBytes != null)
+               {
+                  if (targetDuplicateCache.contains(duplicateBytes))
+                  {
+                     log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
+                     acknowledge(tx, ref);
+                     ignored = true;
+                  }
+               }
+               if (!ignored)
+               {
+                  move(toAddress, tx, ref, false);
+               }
+               iter.remove();
+            }
+         }
+   
+         List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+         for (MessageReference ref : cancelled)
+         {
+            byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+            if (duplicateBytes != null)
+            {
+               if (targetDuplicateCache.contains(duplicateBytes))
+               {
+                  log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored");
+                  continue;
+               }
+            }
+   
             deliveringCount.incrementAndGet();
+            count++;
             move(toAddress, tx, ref, false);
-            iter.remove();
-            count++;
+            acknowledge(tx, ref);
          }
+   
+         tx.commit();
+   
+         return count;
       }
-
-      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
-      for (MessageReference ref : cancelled)
+      catch (Exception e)
       {
-         deliveringCount.incrementAndGet();
-         move(toAddress, tx, ref, false);
-         acknowledge(tx, ref);
-         count++;
+         deliveringCount.addAndGet(-count);
+         throw e;
       }
-
-      tx.commit();
-
-      return count;
    }
 
    public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -14,7 +14,6 @@
 package org.hornetq.core.server.impl;
 
 import java.io.InputStream;
-import java.util.Arrays;
 
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
@@ -89,7 +88,7 @@
    {
       super(other);
    }
-   
+
    public boolean isServerMessage()
    {
       return true;
@@ -109,10 +108,10 @@
 
    public synchronized int incrementRefCount() throws Exception
    {
-      refCount ++;
-      
+      refCount++;
+
       if (pagingStore != null)
-      {         
+      {
          if (refCount == 1)
          {
             pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
@@ -213,8 +212,7 @@
    {
       if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
       {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS,
-                           other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+         putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
 
          putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
       }
@@ -269,12 +267,7 @@
    @Override
    public String toString()
    {
-      return "ServerMessage[messageID=" + messageID +
-             ", durable=" +
-             durable +
-             ", address=" +
-             getAddress() +
-             "]";
+      return "ServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
    }
 
    // FIXME - this is stuff that is only used in large messages
@@ -293,5 +286,34 @@
 
       buffer.setLong(buffer.getInt(MessageImpl.BUFFER_HEADER_SPACE) + DataConstants.SIZE_INT, messageID);
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.ServerMessage#getDuplicateIDBytes()
+    */
+   public byte[] getDuplicateIDBytes()
+   {
+      Object duplicateID = getDuplicateProperty();
+
+      if (duplicateID == null)
+      {
+         return null;
+      }
+      else
+      {
+         if (duplicateID instanceof SimpleString)
+         {
+            return ((SimpleString)duplicateID).getData();
+         }
+         else
+         {
+            return (byte[])duplicateID;
+         }
+      }
+   }
    
+   public Object getDuplicateProperty()
+   {
+      return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
+   }
+
 }

Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -28,6 +28,7 @@
 import org.hornetq.core.management.impl.MBeanInfoHelper;
 import org.hornetq.core.messagecounter.MessageCounter;
 import org.hornetq.core.messagecounter.impl.MessageCounterHelper;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.jms.client.HornetQDestination;
 import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.jms.client.SelectorTranslator;

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -29,6 +29,12 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.JMSFactoryType;
 import org.hornetq.api.jms.management.JMSQueueControl;
@@ -47,6 +53,7 @@
 import org.hornetq.tests.integration.management.ManagementTestBase;
 import org.hornetq.tests.unit.util.InVMContext;
 import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.utils.UUIDGenerator;
 import org.hornetq.utils.json.JSONArray;
 
 /**
@@ -70,6 +77,8 @@
 
    protected HornetQQueue queue;
 
+   protected String queueName;
+
    protected Context context;
 
    // Static --------------------------------------------------------
@@ -98,9 +107,9 @@
       MessageConsumer consumer = JMSUtil.createConsumer(connection, queue);
 
       Assert.assertEquals(1, queueControl.getConsumerCount());
-      
+
       JSONArray jsonArray = new JSONArray(queueControl.listConsumersAsJSON());
-      
+
       assertEquals(1, jsonArray.length());
 
       JMSUtil.sendMessages(queue, 2);
@@ -753,6 +762,236 @@
       serverManager.destroyQueue(otherQueueName);
    }
 
+   public void testMoveMessagesWithDuplicateIDSet() throws Exception
+   {
+      String otherQueueName = RandomUtil.randomString();
+
+      serverManager.createQueue(false, otherQueueName, null, true, otherQueueName);
+      HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+
+      ClientProducer prod1 = session.createProducer(queue.getAddress());
+      ClientProducer prod2 = session.createProducer(otherQueue.getAddress());
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+
+         msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+         prod1.send(msg);
+         if (i < 5)
+         {
+            prod2.send(msg);
+         }
+      }
+
+      session.commit();
+
+      JMSQueueControl queueControl = createManagementControl();
+      JMSQueueControl otherQueueControl = ManagementControlHelper.createJMSQueueControl((HornetQQueue)otherQueue,
+                                                                                        mbeanServer);
+
+      Assert.assertEquals(10, queueControl.getMessageCount());
+
+      int moved = queueControl.moveMessages(null, otherQueueName);
+
+      assertEquals(10, moved);
+
+      assertEquals(0, queueControl.getDeliveringCount());
+
+      session.start();
+
+      ClientConsumer cons1 = session.createConsumer(queue.getAddress());
+
+      assertNull(cons1.receiveImmediate());
+
+      cons1.close();
+
+      ClientConsumer cons2 = session.createConsumer(otherQueue.getAddress());
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage msg = cons2.receive(10000);
+
+         assertNotNull(msg);
+
+         msg.acknowledge();
+      }
+
+      cons2.close();
+
+      session.close();
+
+      sf.close();
+
+      locator.close();
+
+      Assert.assertEquals(0, queueControl.getMessageCount());
+
+      Assert.assertEquals(0, otherQueueControl.getMessageCount());
+
+      serverManager.destroyQueue(otherQueueName);
+   }
+
+
+   public void testMoveIndividualMessagesWithDuplicateIDSetUsingI() throws Exception
+   {
+      String otherQueueName = RandomUtil.randomString();
+
+      serverManager.createQueue(false, otherQueueName, null, true, otherQueueName);
+      HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+
+      ClientProducer prod1 = session.createProducer(queue.getAddress());
+      ClientProducer prod2 = session.createProducer(otherQueue.getAddress());
+      
+      String [] ids = new String[10];
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+
+         msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+         msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+         
+         prod1.send(msg);
+         
+         ids[i] = "ID:" + msg.getUserID().toString();
+         if (i < 5)
+         {
+            msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+            prod2.send(msg);
+         }
+      }
+
+      session.commit();
+
+      JMSQueueControl queueControl = createManagementControl();
+      JMSQueueControl otherQueueControl = ManagementControlHelper.createJMSQueueControl((HornetQQueue)otherQueue,
+                                                                                        mbeanServer);
+
+      Assert.assertEquals(10, queueControl.getMessageCount());
+
+      for (int i = 0 ; i < 10; i++)
+      {
+         queueControl.moveMessage(ids[i], otherQueueName);
+      }
+
+      assertEquals(0, queueControl.getDeliveringCount());
+
+      session.start();
+
+      ClientConsumer cons1 = session.createConsumer(queue.getAddress());
+
+      assertNull(cons1.receiveImmediate());
+
+      cons1.close();
+
+      ClientConsumer cons2 = session.createConsumer(otherQueue.getAddress());
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage msg = cons2.receive(10000);
+
+         assertNotNull(msg);
+
+         msg.acknowledge();
+      }
+
+      cons2.close();
+
+      session.close();
+
+      sf.close();
+
+      locator.close();
+
+      Assert.assertEquals(0, queueControl.getMessageCount());
+
+      Assert.assertEquals(0, otherQueueControl.getMessageCount());
+
+      serverManager.destroyQueue(otherQueueName);
+   }
+
+   public void testMoveMessagesWithDuplicateIDSetSingleMessage() throws Exception
+   {
+      String otherQueueName = RandomUtil.randomString();
+
+      serverManager.createQueue(false, otherQueueName, null, true, otherQueueName);
+      HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+
+      ClientProducer prod1 = session.createProducer(queue.getAddress());
+      ClientProducer prod2 = session.createProducer(otherQueue.getAddress());
+
+      ClientMessage msg = session.createMessage(true);
+
+      msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-1"));
+
+      prod1.send(msg);
+      prod2.send(msg);
+
+      JMSQueueControl queueControl = createManagementControl();
+      JMSQueueControl otherQueueControl = ManagementControlHelper.createJMSQueueControl((HornetQQueue)otherQueue,
+                                                                                        mbeanServer);
+
+      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, otherQueueControl.getMessageCount());
+
+      int moved = queueControl.moveMessages(null, otherQueueName);
+
+      assertEquals(1, moved);
+
+      assertEquals(0, queueControl.getDeliveringCount());
+
+      session.start();
+
+      ClientConsumer cons1 = session.createConsumer(queue.getAddress());
+
+      assertNull(cons1.receiveImmediate());
+
+      cons1.close();
+
+      ClientConsumer cons2 = session.createConsumer(otherQueue.getAddress());
+
+      msg = cons2.receive(10000);
+
+      assertNotNull(msg);
+
+      msg.acknowledge();
+
+      cons2.close();
+
+      session.close();
+
+      sf.close();
+
+      locator.close();
+
+      Assert.assertEquals(0, queueControl.getMessageCount());
+
+      Assert.assertEquals(0, otherQueueControl.getMessageCount());
+
+      serverManager.destroyQueue(otherQueueName);
+   }
+
    public void testMoveMessageWithUnknownMessageID() throws Exception
    {
       String unknownMessageID = RandomUtil.randomString();
@@ -839,7 +1078,7 @@
       serverManager.start();
       serverManager.activated();
 
-      String queueName = RandomUtil.randomString();
+      queueName = RandomUtil.randomString();
       serverManager.createQueue(false, queueName, null, true, queueName);
       queue = (HornetQQueue)HornetQJMSClient.createQueue(queueName);
    }
@@ -873,7 +1112,8 @@
 
    private Connection createConnection() throws JMSException
    {
-      HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+                                                                                                                new TransportConfiguration(InVMConnectorFactory.class.getName()));
 
       cf.setBlockOnDurableSend(true);
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-12-29 17:43:44 UTC (rev 10086)
@@ -992,6 +992,24 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.ServerMessage#getDuplicateIDBytes()
+       */
+      public byte[] getDuplicateIDBytes()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.ServerMessage#getDuplicateProperty()
+       */
+      public Object getDuplicateProperty()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list