[hornetq-commits] JBoss hornetq SVN: r11926 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 21 23:23:08 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-21 23:23:07 -0500 (Wed, 21 Dec 2011)
New Revision: 11926

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://issues.jboss.org/browse/JBPAPP-7809 - fixing large message copy on DLQ / Expiry Queue

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -1746,25 +1746,6 @@
 
       messageEncoding.decode(buff);
 
-      if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
-      {
-         long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
-         LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
-         if (originalMessage == null)
-         {
-            // this could happen if the message was deleted but the file still exists as the file still being used
-            originalMessage = createLargeMessage();
-            originalMessage.setDurable(true);
-            originalMessage.setMessageID(originalMessageID);
-            messages.put(originalMessageID, originalMessage);
-         }
-
-         originalMessage.incrementDelayDeletionCount();
-
-         largeMessage.setLinkedMessage(originalMessage);
-      }
       return largeMessage;
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -48,8 +48,6 @@
    // Attributes ----------------------------------------------------
 
    private final JournalStorageManager storageManager;
-
-   private LargeServerMessage linkMessage;
    
    private long pendingRecordID = -1;
    
@@ -80,7 +78,6 @@
    private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
    {
       super(copy, properties);
-      linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
       bodySize = copy.bodySize;
@@ -191,7 +188,7 @@
          checkDelete();
       }
    }
-
+   
    @Override
    public BodyEncoder getBodyEncoder() throws HornetQException
    {
@@ -203,27 +200,19 @@
    {
       if (getRefCount() <= 0)
       {
-         if (linkMessage != null)
+         if (LargeServerMessageImpl.isTrace)
          {
-            // This file is linked to another message, deleting the reference where it belongs on this case
-            linkMessage.decrementDelayDeletionCount();
+            LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
          }
-         else
-         {
-            if (LargeServerMessageImpl.isTrace)
-            {
-               LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
-            }
 
-            try
-            {
-               deleteFile();
-            }
-            catch (Exception e)
-            {
-               LargeServerMessageImpl.log.error(e.getMessage(), e);
-            }
+         try
+         {
+            deleteFile();
          }
+         catch (Exception e)
+         {
+            LargeServerMessageImpl.log.error(e.getMessage(), e);
+         }
       }
    }
 
@@ -319,15 +308,9 @@
    {
       long idToUse = messageID;
 
-      if (linkMessage != null)
-      {
-         idToUse = linkMessage.getMessageID();
-      }
-
       SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
 
-      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
-                                                                               : (LargeServerMessageImpl)linkMessage,
+      ServerMessage newMessage = new LargeServerMessageImpl(this,
                                                             properties,
                                                             newfile,
                                                             messageID);
@@ -338,60 +321,34 @@
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
-      if (!paged)
+      try
       {
-         incrementDelayDeletionCount();
-   
-         long idToUse = messageID;
-   
-         if (linkMessage != null)
-         {
-            idToUse = linkMessage.getMessageID();
-         }
-   
-         SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-   
-         ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
-                                                                                  : (LargeServerMessageImpl)linkMessage,
-                                                               properties,
-                                                               newfile,
-                                                               newID);
+         validateFile();
+         
+         SequentialFile file = this.file;
+         
+         SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+         
+         file.copyTo(newFile);
+         
+         LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
+         
          return newMessage;
       }
-      else
+      catch (Exception e)
       {
-         try
-         {
-            validateFile();
-            
-            SequentialFile file = this.file;
-            
-            SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-            
-            file.copyTo(newFile);
-            
-            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
-            
-            newMessage.linkMessage = null;
-            
-            newMessage.setPaged();
-            
-            return newMessage;
-         }
-         catch (Exception e)
-         {
-            log.warn("Error on copying large message this for DLA or Expiry", e);
-            return null;
-         }
-         finally
-         {
-            releaseResources();
-         }
+         log.warn("Error on copying large message " + this + " for DLA or Expiry", e);
+         return null;
       }
+      finally
+      {
+         releaseResources();
+      }
    }
 
-   public SequentialFile getFile()
+   public SequentialFile getFile() throws Exception
    {
+      validateFile();
       return file;
    }
 
@@ -463,32 +420,6 @@
 	  }
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
-    */
-   public void setLinkedMessage(final LargeServerMessage message)
-   {
-      if (file != null)
-      {
-         // Sanity check.. it shouldn't happen
-         throw new IllegalStateException("LargeMessage file was already set");
-      }
-
-      linkMessage = message;
-
-      file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
-      try
-      {
-         openFile();
-         bodySize = file.size();
-         closeFile();
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException("could not setup linked file", e);
-      }
-   }
-
    // Inner classes -------------------------------------------------
 
    class DecodingContext implements BodyEncoder

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.server;
 
+
 /**
  * A LargeMessage
  *
@@ -26,9 +27,6 @@
 {
    void addBytes(byte[] bytes) throws Exception;
 
-   /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
-   void setLinkedMessage(LargeServerMessage message);
-   
    void setPendingRecordID(long pendingRecordID);
    
    long getPendingRecordID();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -691,6 +691,9 @@
          serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
          serverLocator.setCallTimeout(callTimeout);
+         
+         // No producer flow control on the bridges, as we don't want to lock the queues
+         serverLocator.setProducerWindowSize(-1);
 
          if (retryInterval > 0)
          {
@@ -930,6 +933,9 @@
       targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
       targetLocator.setMinLargeMessageSize(minLargeMessageSize);
 
+      // No producer flow control on the bridges, as we don't want to lock the queues
+      targetLocator.setProducerWindowSize(-1);
+
       targetLocator.setAfterConnectionInternalListener(this);
 
       targetLocator.setNodeID(nodeId);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -930,7 +930,7 @@
                int localChunkLen = 0;
 
                localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
+               
                HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
 
                context.encode(bodyBuffer, localChunkLen);

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+   final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+   final SimpleString DLQ = new SimpleString("my-DLQ");
+
+   final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+   final int messageSize = 10 * 1024;
+
+   // it has to be an even number
+   final int numberOfMessages = 50;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testExpiryMessagesThenDLQ() throws Exception
+   {
+      HornetQServer server = createServer(true);
+
+      server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+      AddressSettings setting = new AddressSettings();
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(50 * 1024);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setExpiryAddress(EXPIRY);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(50 * 1024);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+      server.start();
+
+      try
+      {
+
+         server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+         server.createQueue(DLQ, DLQ, null, true, false);
+
+         server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+         ServerLocator locator = createInVMNonHALocator();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(true, true, 0);
+
+         byte bufferSample[] = new byte[messageSize];
+
+         for (int i = 0; i < bufferSample.length; i++)
+         {
+            bufferSample[i] = getSamplebyte(i);
+         }
+
+         ClientProducer producer = session.createProducer(MY_QUEUE);
+
+         long timeToExpiry = System.currentTimeMillis() + 1000;
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(true);
+
+            message.putIntProperty("count", i);
+
+            // Send a few regular messages first, then all is just large messages
+            if (i % 2 == 0)
+            {
+               message.putBooleanProperty("tst-large", false);
+               message.getBodyBuffer().writeBytes(bufferSample);
+            }
+            else
+            {
+               message.putBooleanProperty("tst-large", true);
+               message.setBodyInputStream(createFakeLargeStream(messageSize));
+            }
+
+            message.setExpiration(timeToExpiry);
+
+            producer.send(message);
+         }
+
+         server.stop();
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+
+         Thread.sleep(1500);
+
+         ClientConsumer cons = session.createConsumer(MY_QUEUE);
+         assertNull(cons.receive(1000));
+
+         session.close();
+
+         session = sf.createSession(false, false);
+
+         cons = session.createConsumer(EXPIRY);
+         session.start();
+         
+         
+         // Consume half of the messages to make sure all the messages are paging (on the second try)
+         for (int i = 0 ; i < numberOfMessages / 2; i++)
+         {
+            ClientMessage msg = cons.receive(5000);
+            assertNotNull(msg);
+            msg.acknowledge();
+         }
+         
+         session.commit();
+         
+         cons.close();
+
+         for (int rep = 0; rep < 6; rep++)
+         {
+            cons = session.createConsumer(EXPIRY);
+            session.start();
+
+            System.out.println("Trying " + rep);
+            for (int i = 0; i < numberOfMessages / 2; i++)
+            {
+               ClientMessage message = cons.receive(5000);
+               assertNotNull(message);
+
+               if (i % 10 == 0)
+               {
+                  System.out.println("Received " + i);
+               }
+               
+               for (int location = 0; location < messageSize; location++)
+               {
+                  assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+               }
+               message.acknowledge();
+            }
+
+            session.rollback();
+
+            cons.close();
+
+            session.close();
+            sf.close();
+
+            if (rep == 0)
+            {
+               // restart the server at the first try
+               server.stop();
+               server.start();
+            }
+
+            sf = locator.createSessionFactory();
+            session = sf.createSession(false, false);
+            session.start();
+         }
+         
+         cons = session.createConsumer(EXPIRY);
+         session.start();
+         assertNull(cons.receiveImmediate());
+
+         cons.close();
+
+         session.close();
+         sf.close();
+
+         for (int rep = 0; rep < 2; rep++)
+         {
+            sf = locator.createSessionFactory();
+
+            session = sf.createSession(false, false);
+
+            cons = session.createConsumer(DLQ);
+
+            session.start();
+
+            for (int i = 0; i < numberOfMessages / 2; i++)
+            {
+               ClientMessage message = cons.receive(5000);
+               assertNotNull(message);
+
+               if (i % 10 == 0)
+               {
+                  System.out.println("Received " + i);
+               }
+
+               for (int location = 0; location < messageSize; location++)
+               {
+                  assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+               }
+               message.acknowledge();
+            }
+            if (rep == 0)
+            {
+               session.rollback();
+               session.close();
+               sf.close();
+               server.stop();
+               server.start();
+            }
+         }
+
+         session.commit();
+
+         assertNull(cons.receiveImmediate());
+
+         session.close();
+         sf.close();
+         locator.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-12-22 04:23:07 UTC (rev 11926)
@@ -96,9 +96,6 @@
       }
       locators.clear();
       super.tearDown();
-//      checkFreePort(5445);
-//      checkFreePort(5446);
-//      checkFreePort(5447);
       if (InVMRegistry.instance.size() > 0)
       {
          fail("InVMREgistry size > 0");



More information about the hornetq-commits mailing list