[hornetq-commits] JBoss hornetq SVN: r9883 - branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 12 15:24:00 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-12 15:24:00 -0500 (Fri, 12 Nov 2010)
New Revision: 9883

Modified:
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
adding new test

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-12 19:37:52 UTC (rev 9882)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-12 20:24:00 UTC (rev 9883)
@@ -17,6 +17,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -621,6 +622,116 @@
 
    }
    
+
+   public void testConsumeLivePageMultiThread() throws Exception
+   {
+      final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+      pageStore.startPaging();
+
+      final int NUM_TX = 100;
+      
+      final int MSGS_TX = 100;
+      
+      final int TOTAL_MSG = NUM_TX * MSGS_TX;
+
+      final int messageSize = 1024;
+
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+
+      PageSubscription cursor = this.server.getPagingManager()
+                                           .getPageStore(ADDRESS)
+                                           .getCursorProvier()
+                                           .getSubscription(queue.getID());
+
+      System.out.println("Cursor: " + cursor);
+      
+      final StorageManager storage = this.server.getStorageManager();
+      
+      final AtomicInteger exceptions = new AtomicInteger(0);
+      
+      Thread t1 = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               int count = 0;
+               
+               for (int txCount = 0; txCount < NUM_TX; txCount++)
+               {
+                  
+                  Transaction tx = null;
+                  
+                  if (txCount % 2 == 0)
+                  {
+                     tx = new TransactionImpl(storage);
+                  }
+
+                  RoutingContext ctx = generateCTX(tx);
+                  
+                  for (int i = 0 ; i < MSGS_TX; i++)
+                  {
+                     //System.out.println("Sending " + count);
+                     HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
+
+                     ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+                     msg.putIntProperty("key", count++);
+
+                     msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+                     Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+                  }
+                  
+                  if (tx != null)
+                  {
+                     tx.commit();
+                  }
+                  
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               exceptions.incrementAndGet();
+            }
+         }
+      };
+      
+      t1.start();
+      
+      
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+      
+      for (int i = 0 ; i < TOTAL_MSG; i++ )
+      {
+         assertEquals(0, exceptions.get());
+         PagedReference ref = null;
+         for (int repeat = 0 ; repeat < 5; repeat++)
+         {
+            ref = iterator.next();
+            if (ref == null)
+            {
+               Thread.sleep(1000);
+            }
+            else
+            {
+               break;
+            }
+         }
+         assertNotNull(ref);
+         
+         ref.acknowledge();
+         assertNotNull(ref);
+         
+         System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
+         //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
+      }
+
+      assertEquals(0, exceptions.get());
+   }
+   
    private RoutingContextImpl generateCTX()
    {
       return generateCTX(null);



More information about the hornetq-commits mailing list