[hornetq-commits] JBoss hornetq SVN: r9807 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 21 17:02:06 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-21 17:02:05 -0400 (Thu, 21 Oct 2010)
New Revision: 9807

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Live cursors update

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-21 14:45:43 UTC (rev 9806)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-21 21:02:05 UTC (rev 9807)
@@ -105,8 +105,9 @@
    {
       ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
 
+      size.set((int)file.size());
       // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
-      ByteBuffer buffer2 = ByteBuffer.allocateDirect((int)file.size());
+      ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
       
       file.position(0);
       file.read(buffer2);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-21 14:45:43 UTC (rev 9806)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-21 21:02:05 UTC (rev 9807)
@@ -20,9 +20,7 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -472,9 +470,32 @@
                         firstPageId = fileId;
                      }
                   }
-
-                  if (numberOfPages != 0)
+                  
+                  if (currentPageId != 0)
                   {
+                     currentPage = createPage(currentPageId);
+                     currentPage.open();
+                     
+                     List<PagedMessage> messages = currentPage.read();
+                     
+                     LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+                     
+                     for (PagedMessage msg : messages)
+                     {
+                        msg.initMessage(storageManager);
+                        pageCache.addLiveMessage(msg);
+                     }
+                     
+                     currentPage.setLiveCache(pageCache);
+                     
+                     currentPageSize.set(currentPage.getSize());
+                     
+                     cursorProvider.addPageCache(pageCache);
+                  }
+                  
+                  if (currentPage != null)
+                  {
+                     
                      startPaging();
                   }
                }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-21 14:45:43 UTC (rev 9806)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-21 21:02:05 UTC (rev 9807)
@@ -134,16 +134,7 @@
    }
 
 
-   /**
-    * @return
-    * @throws Exception
-    */
-   private PageCursor createNonPersistentCursor() throws Exception
-   {
-      return lookupCursorProvider().createNonPersistentCursor();
-   }
 
-
    public void testReadNextPage() throws Exception
    {
 
@@ -160,15 +151,6 @@
       assertNull(cache);
    }
 
-
-   /**
-    * @return
-    * @throws Exception
-    */
-   private PageCursorProvider lookupCursorProvider() throws Exception
-   {
-      return lookupPageStore(ADDRESS).getCursorProvier();
-   }
    
    
    public void testRestart() throws Exception
@@ -348,7 +330,7 @@
 
       pageStore.startPaging();
 
-      final int NUM_MESSAGES = 1000;
+      final int NUM_MESSAGES = 100;
       
       final int messageSize = 1024 * 1024;
       
@@ -378,13 +360,82 @@
          
          assertNotNull(readMessage);
          
-         cursor.ack(readMessage.a);
-         
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
          
          assertNull(cursor.moveNext());
       }
       
+      server.stop();
+      
+      OperationContextImpl.clearContext();
+      
+      createServer();
+      
+      pageStore = lookupPageStore(ADDRESS);
+      
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      
+      for (int i = 0; i < NUM_MESSAGES * 2; i++)
+      {
+         if (i % 100 == 0) System.out.println("Paged " + i);
+
+         if (i >= NUM_MESSAGES)
+         {
+            
+            HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+   
+            ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+            msg.putIntProperty("key", i);
+            
+            msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+   
+            Assert.assertTrue(pageStore.page(msg));
+         }
+         
+         Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+         
+         assertNotNull(readMessage);
+         
+         assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+      }
+
+      server.stop();
+      
+      OperationContextImpl.clearContext();
+      
+      createServer();
+      
+      pageStore = lookupPageStore(ADDRESS);
+      
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+      
+      for (int i = 0; i < NUM_MESSAGES * 3; i++)
+      {
+         if (i % 100 == 0) System.out.println("Paged " + i);
+
+         if (i >= NUM_MESSAGES * 2)
+         {
+            
+            HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+   
+            ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+            msg.putIntProperty("key", i);
+            
+            msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+   
+            Assert.assertTrue(pageStore.page(msg));
+         }
+         
+         Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+         
+         assertNotNull(readMessage);
+         
+         cursor.ack(readMessage.a);
+         
+         assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+      }
+
+      
    }
    
    
@@ -455,36 +506,6 @@
    }
 
 
-   /**
-    * @param storage
-    * @param pageStore
-    * @param pgParameter
-    * @param start
-    * @param NUM_MESSAGES
-    * @param messageSize
-    * @throws Exception
-    */
-   private void pgMessages(StorageManager storage,
-                           PagingStoreImpl pageStore,
-                           PageTransactionInfo pgParameter,
-                           int start,
-                           final int NUM_MESSAGES,
-                           final int messageSize) throws Exception
-   {
-      List<ServerMessage> messages = new ArrayList<ServerMessage>();
-      
-      for (int i = start ; i < start + NUM_MESSAGES; i++)
-      {
-         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-         ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
-         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-         msg.putIntProperty("key", i);
-         messages.add(msg);
-      }
-      
-      pageStore.page(messages, pgParameter.getTransactionID());
-   }
-   
    public void testCleanupScenarios() throws Exception
    {
       // Validate the pages are being cleared (with multiple cursors)
@@ -587,7 +608,7 @@
       server.start();
       
       Thread.sleep(1000);
-      assertEquals(2, lookupPageStore(ADDRESS).getNumberOfPages());
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
       
       
 
@@ -631,7 +652,7 @@
       
       OperationContextImpl.clearContext();
       
-      server.start();
+      createServer();
       
       cursorProvider = lookupCursorProvider();
       cursor = cursorProvider.getPersistentCursor(queue.getID());
@@ -649,11 +670,11 @@
       
       // This is to make sure all the pending files will be deleted
       server.stop();
+      OperationContextImpl.clearContext();
+
+      createServer();
       
-      server.start();
-      
-      // TODO: this should be exact 2
-      assertTrue(lookupPageStore(ADDRESS).getNumberOfPages() <= 3);
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
    
@@ -708,6 +729,17 @@
       OperationContextImpl.clearContext();
       System.out.println("Tmp:" + getTemporaryDir());
       
+      createServer();
+
+      //createQueue(ADDRESS.toString(), ADDRESS.toString());
+   }
+
+
+   /**
+    * @throws Exception
+    */
+   private void createServer() throws Exception
+   {
       Configuration config = createDefaultConfig();
       
       config.setJournalSyncNonTransactional(true);
@@ -720,11 +752,63 @@
 
       server.start();
       
-      queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+      try
+      {
+         queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+      }
+      catch (Exception ignored)
+      {
+       }
+   }
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PageCursor createNonPersistentCursor() throws Exception
+   {
+      return lookupCursorProvider().createNonPersistentCursor();
+   }
 
-      //createQueue(ADDRESS.toString(), ADDRESS.toString());
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PageCursorProvider lookupCursorProvider() throws Exception
+   {
+      return lookupPageStore(ADDRESS).getCursorProvier();
    }
 
+   /**
+    * @param storage
+    * @param pageStore
+    * @param pgParameter
+    * @param start
+    * @param NUM_MESSAGES
+    * @param messageSize
+    * @throws Exception
+    */
+   private void pgMessages(StorageManager storage,
+                           PagingStoreImpl pageStore,
+                           PageTransactionInfo pgParameter,
+                           int start,
+                           final int NUM_MESSAGES,
+                           final int messageSize) throws Exception
+   {
+      List<ServerMessage> messages = new ArrayList<ServerMessage>();
+      
+      for (int i = start ; i < start + NUM_MESSAGES; i++)
+      {
+         HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+         ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
+         msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+         msg.putIntProperty("key", i);
+         messages.add(msg);
+      }
+      
+      pageStore.page(messages, pgParameter.getTransactionID());
+   }
+   
+
    protected void tearDown() throws Exception
    {
       server.stop();



More information about the hornetq-commits mailing list