[hornetq-commits] JBoss hornetq SVN: r10650 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/journal/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri May 13 01:14:48 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-13 01:14:47 -0400 (Fri, 13 May 2011)
New Revision: 10650

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-6466 - improving IO on depaging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java	2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java	2011-05-13 05:14:47 UTC (rev 10650)
@@ -95,7 +95,7 @@
 
    SequentialFile copy();
    
-   void copyTo(SequentialFile newFileName);
+   void copyTo(SequentialFile newFileName) throws Exception;
 
    void setTimedBuffer(TimedBuffer buffer);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-05-13 05:14:47 UTC (rev 10650)
@@ -105,35 +105,27 @@
       file.delete();
    }
    
-   public void copyTo(SequentialFile newFileName)
+   public void copyTo(SequentialFile newFileName) throws Exception
    {
-      try
+      log.debug("Copying "  + this + " as " + newFileName);
+      newFileName.open();
+      this.open();
+      
+      
+      ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+      
+      for (;;)
       {
-         log.debug("Copying "  + this + " as " + newFileName);
-         newFileName.open();
-         this.open();
-         
-         
-         ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-         
-         for (;;)
+         buffer.rewind();
+         int size = this.read(buffer);
+         newFileName.writeInternal(buffer);
+         if (size < 10 * 1024)
          {
-            buffer.rewind();
-            int size = this.read(buffer);
-            newFileName.writeInternal(buffer);
-            if (size < 10 * 1024)
-            {
-               break;
-            }
+            break;
          }
-         newFileName.close();
-         this.close();
       }
-      catch (Exception e)
-      {
-         log.warn("Error on copying file " + this + " as " + newFileName, e);
-      }
-      
+      newFileName.close();
+      this.close();
    }
 
    public void position(final long pos) throws Exception

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-05-13 05:14:47 UTC (rev 10650)
@@ -307,19 +307,29 @@
       }
       else
       {
-         SequentialFile file = this.file;
-         
-         SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-         
-         file.copyTo(newFile);
-         
-         LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
-         
-         newMessage.linkMessage = null;
-         
-         newMessage.setPaged();
-         
-         return newMessage;
+         try
+         {
+            validateFile();
+            
+            SequentialFile file = this.file;
+            
+            SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+            
+            file.copyTo(newFile);
+            
+            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+            
+            newMessage.linkMessage = null;
+            
+            newMessage.setPaged();
+            
+            return newMessage;
+         }
+         catch (Exception e)
+         {
+            log.warn("Error on copying large message this for DLA or Expiry", e);
+            return null;
+         }
       }
    }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-13 04:10:25 UTC (rev 10649)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-13 05:14:47 UTC (rev 10650)
@@ -120,7 +120,7 @@
    {
       locator.close();
 
-      super.tearDown();
+      //super.tearDown();
    }
 
    public void testPreparePersistent() throws Exception
@@ -3741,6 +3741,163 @@
       }
    }
 
+   public void testExpireLargeMessageOnPaging() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+      AddressSettings dla = new AddressSettings();
+      dla.setMaxDeliveryAttempts(5);
+      dla.setDeadLetterAddress(new SimpleString("DLA"));
+      dla.setExpiryAddress(new SimpleString("DLA"));
+      settings.put(ADDRESS.toString(), dla);
+
+      final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+      server.start();
+
+      final int messageSize = 20;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.createQueue("DLA", "DLA");
+
+         PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         pgStoreAddress.startPaging();
+         PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < 500; i++)
+         {
+            log.info("send message #" + i);
+            message = session.createMessage(true);
+
+            message.putStringProperty("id", "str" + i);
+            
+            message.setExpiration(System.currentTimeMillis() + 2000);
+
+            if (i % 2 == 0)
+            {
+               message.setBodyInputStream(createFakeLargeStream(messageSize));
+            }
+            else
+            {
+               byte bytes[] = new byte[messageSize];
+               for (int s = 0 ; s < bytes.length; s++)
+               {
+                  bytes[s] = getSamplebyte(s);
+               }
+               message.getBodyBuffer().writeBytes(bytes);
+            }
+
+            producer.send(message);
+            
+            if ((i + 1) % 2 == 0)
+            {
+               session.commit();
+               if (i < 400)
+               {
+                  pgStoreAddress.forceAnotherPage();
+               }
+            }
+         }
+
+         session.commit();
+         
+         sf.close();
+         
+         locator.close();
+         
+         server.stop();
+         
+         Thread.sleep(3000);
+         
+         server.start();
+         
+         locator = createInVMNonHALocator();
+         
+         sf = locator.createSessionFactory();
+         
+         session = sf.createSession(false, false);
+         
+         session.start();
+         
+         ClientConsumer cons = session.createConsumer(ADDRESS);
+         
+         assertNull(cons.receive(1000));
+         
+         cons.close();
+         
+         cons = session.createConsumer("DLA");
+
+         for (int i = 0; i < 500; i++)
+         {
+            log.info("Received message " + i);
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            message.saveToOutputStream(new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+
+               }
+            });
+         }
+         
+         assertNull(cons.receiveImmediate());
+         
+         session.commit();
+         
+         cons.close();
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         
+         pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         
+         while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+         {
+            Thread.sleep(50);
+         }
+         
+         assertFalse(pgStoreAddress.isPaging());
+
+         session.close();
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list