[jboss-cvs] JBoss Messaging SVN: r4793 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/journal/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 12 09:55:30 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-12 09:55:30 -0400 (Tue, 12 Aug 2008)
New Revision: 4793

Added:
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Paging store concurrency tests

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -48,4 +48,6 @@
    
    int getAlignment();
    
+   int calculateBlockSize(int bytes) throws Exception;
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -77,4 +77,13 @@
       newbuffer.put(bytes);
       return newbuffer;
    }
+
+   public int calculateBlockSize(int position) throws Exception
+   {
+      int alignment = getAlignment();
+      
+      int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+      
+      return pos;
+   }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -67,6 +67,11 @@
    {
       return 1;
    }
+
+   public int calculateBlockSize(int bytes) throws Exception
+   {
+      return bytes;
+   }
    
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -41,9 +41,11 @@
    
    String getStoreName();
    
+   void startPage() throws Exception;
+   
    void sync() throws Exception;
    
-   void writeOnCurrentPage(ServerMessage message) throws Exception;
+   boolean writeOnCurrentPage(ServerMessage message) throws Exception;
    
    /** 
     * Remove the first page from the Writing Queue.

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -64,7 +64,6 @@
    private final SequentialFile file;
    private final SequentialFileFactory fileFactory;
    private final PagingCallback callback;
-   private volatile long preallocSize = -1;
    private final AtomicInteger size = new AtomicInteger(0);
    
    // Static --------------------------------------------------------
@@ -158,7 +157,6 @@
       if (callback != null)
       {
          callback.countUp();
-         expandIfNeeded(buffer.limit());
          file.write(buffer, callback);
       }
       else
@@ -187,7 +185,7 @@
    {
       file.open();
       this.size.set((int)file.size());
-      file.position((int)file.size());
+      file.position(0);
    }
    
    public void close() throws Exception
@@ -227,27 +225,6 @@
    // Private -------------------------------------------------------
    
    
-   // only used on AIO 
-   // (This method might go away if we decide to not allow AIO on paging.)
-   private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
-   {
-      if (preallocSize < 0)
-      {
-         preallocSize = file.size();
-      }
-      
-      while (file.position() + bytesToWrite > preallocSize)
-      {
-         final int position = file.position();
-         
-         file.fill((int)preallocSize, 1024*1024, (byte)0);
-         
-         preallocSize = file.size();
-         
-         file.position(position);
-      }
-   }
-   
    // Inner classes -------------------------------------------------
 
    private static class PagingCallback implements IOCallback

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -25,6 +25,8 @@
 
 import java.text.DecimalFormat;
 import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -48,6 +50,8 @@
    
    private final DecimalFormat format = new DecimalFormat("000000000");
    
+   private final AtomicInteger pageUsedSize = new AtomicInteger(0);
+   
    private final String storeName;
    
    private final SequentialFileFactory factory;
@@ -58,7 +62,12 @@
    private volatile int firstPageId = Integer.MAX_VALUE;
    private volatile int currentPageId;
    private volatile Page currentPage;
+
+   // This is supposed to perform better than synchronized methods
+   private final Semaphore globalLock = new Semaphore(1);
+
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+   private volatile boolean initialized = false;
    
    
    // Static --------------------------------------------------------
@@ -91,7 +100,9 @@
    
    public Page dequeuePage() throws Exception
    {
+      validateInit();
       
+      globalLock.acquire();
       lock.writeLock().lock();
       
       try
@@ -135,28 +146,72 @@
       finally
       {
          lock.writeLock().unlock();
+         globalLock.release();
       }
       
    }
 
-   public void writeOnCurrentPage(ServerMessage message) throws Exception
+   public boolean writeOnCurrentPage(ServerMessage message) throws Exception
    {
-      checkPage(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+      validateInit();
       
-      lock.readLock().lock();
+      int bytesToWrite = factory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
       
+      globalLock.acquire();
+
       try
       {
-         currentPage.write(message);
+         if (currentPage == null)
+         {
+            return false;
+         }
+         
+         if ((pageUsedSize.addAndGet(bytesToWrite) > maxPageSize && currentPage.getNumberOfMessages() > 0))
+         {
+            lock.writeLock().lock();
+            try
+            {
+               openNewPage();
+               pageUsedSize.addAndGet(bytesToWrite);
+            }
+            finally
+            {
+               lock.writeLock().unlock();
+            }
+         }
+         // we must get the readLock before we release the globalLock
+         // or else we could end up with files records being added to the currentPage even if the max size was already achieved.
+         // (Condition tested by PagingStoreTestPage::testConcurrentPaging, The test would eventually fail, 1 in 100)
+         lock.readLock().lock();
+
       }
       finally
       {
+         globalLock.release();
+      }
+      
+      try
+      {
+         if (currentPage != null)
+         {
+            currentPage.write(message);
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+      finally
+      {
          lock.readLock().unlock();
       }
    }
    
    public void sync() throws Exception
    {
+      validateInit();
+      
       lock.readLock().lock();
       
       try
@@ -171,6 +226,11 @@
    
    public void init() throws Exception
    {
+
+      if (initialized)
+      {
+         throw new IllegalStateException("PageStore " + this.storeName + " already initialized!");
+      }
       
       lock.writeLock().lock();
       
@@ -195,6 +255,13 @@
                firstPageId = fileId;
             }
          }
+         
+         initialized = true;
+
+         if (numberOfPages != 0)
+         {
+            startPage();
+         }
       }
       finally
       {
@@ -202,12 +269,31 @@
       }
    }
    
+   public void startPage() throws Exception
+   {
+      validateInit();
+
+      globalLock.acquire();
+      try
+      {
+         if (currentPage == null)
+         {
+            openNewPage();
+         }
+      }
+      finally
+      {
+         globalLock.release();
+      }
+   }
    
    
+   
    // TestSupportPageStore ------------------------------------------
    
    public void forceAnotherPage() throws Exception
    {
+      validateInit();
       openNewPage();
    }
    
@@ -220,14 +306,6 @@
    // Private -------------------------------------------------------
    
 
-   private synchronized void checkPage(int bytesToWrite) throws Exception
-   {
-      if (currentPage == null || (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0))
-      {
-         openNewPage();
-      }
-   }
-   
    private void openNewPage() throws Exception
    {
       lock.writeLock().lock();
@@ -249,6 +327,8 @@
          
          currentPage = createPage(currentPageId);
          
+         pageUsedSize.set(0);
+         
          currentPage.open();
          
       }
@@ -264,6 +344,19 @@
       String fileName = createFileName(page);
       SequentialFile file = factory.createSequentialFile(fileName, 1000);
       
+      file.open();
+      
+      long size = file.size();
+      
+      if (factory.isSupportsCallbacks() && size < maxPageSize)
+      {
+         file.fill((int)size, (int)maxPageSize - (int)size, (byte)0);
+      }
+      
+      file.position(0);
+      
+      file.close();
+      
       return new PageImpl(factory, file, page);
    }
    
@@ -284,6 +377,14 @@
       return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
    }
    
+   private void validateInit()
+   {
+      if (!initialized)
+      {
+         throw new IllegalStateException("PagingStore " + this.storeName + " not initialized!");
+      }
+   }
+   
    // Inner classes -------------------------------------------------
    
 }

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -78,7 +78,7 @@
    protected void tearDown() throws Exception
    {
       super.tearDown();
-      //deleteDirectory(new File(journalDir));
+      deleteDirectory(new File(journalDir));
    }
 
    // Private -------------------------------------------------------

Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.paging;
+
+import java.io.File;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.paging.impl.PagingStoreTestBase;
+
+public class PagingStoreIntegrationTest extends PagingStoreTestBase
+{
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+
+   protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") +  "/journal-test";
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   public void testPageStoreWithAIO() throws Exception
+   {
+      if (!AsynchronousFileImpl.isLoaded())
+      {
+         fail(String.format("libAIO is not loaded on %s %s %s", 
+               System.getProperty("os.name"), 
+               System.getProperty("os.arch"), 
+               System.getProperty("os.version")));
+      }
+      testConcurrentPaging(new AIOSequentialFileFactory(journalDir), 10);
+   }
+   
+   public void testPageWithNIO() throws Exception
+   {
+      // This integration test could fail 1 in 100 due to race conditions.
+      for (int i = 0; i < 100; i++)
+      {
+         recreateDirectory();
+         System.out.println("Test " + i);
+         testConcurrentPaging(new NIOSequentialFileFactory(journalDir), 10);
+      }
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      recreateDirectory();
+   }
+
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      //deleteDirectory(new File(journalDir));
+   }
+
+   // Private -------------------------------------------------------
+
+   private void recreateDirectory()
+   {
+      File fileJournalDir = new File(journalDir);
+      deleteDirectory(fileJournalDir);
+      fileJournalDir.mkdirs();
+   }
+
+   // Inner classes -------------------------------------------------
+   
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -140,6 +140,15 @@
       return ByteBuffer.allocateDirect(size);
    }
    
+   public int calculateBlockSize(int position) throws Exception
+   {
+      int alignment = getAlignment();
+      
+      int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+      
+      return pos;
+   }
+   
    public ByteBuffer wrapBuffer(byte[] bytes)
    {
       return ByteBuffer.wrap(bytes);
@@ -375,7 +384,7 @@
          return data.position();
       }
 
-      public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+      public synchronized int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
       {
          if (!open)
          {

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -119,22 +119,6 @@
       
       if (callback)
       {
-         
-         // Expanding file (you need that on libaio)
-         EasyMock.expect(file.position()).andStubReturn(0);
-
-         EasyMock.expect(file.size()).andReturn(0l);
-         
-         file.fill(0, 1024 * 1024, (byte)0);
-         
-         EasyMock.expect(file.size()).andReturn(1024l * 1024l);
-
-         file.position(0);
-         
-         // End expanding file
-         
-
-         
          EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
 
             public Integer answer() throws Throwable

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -36,6 +36,11 @@
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
 public abstract class PageImplTestBase extends UnitTestCase
 {
    

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -32,15 +32,16 @@
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.tests.util.RandomUtil;
-import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
-public class PagingStoreImplTest extends UnitTestCase
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PagingStoreImplTest extends PagingStoreTestBase
 {
    
    // Constants -----------------------------------------------------
@@ -59,8 +60,14 @@
       
       PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
       
+      storeImpl.init();
+      
       assertEquals(0, storeImpl.getNumberOfPages());
       
+      storeImpl.startPage();
+
+      assertEquals(1, storeImpl.getNumberOfPages());
+      
       List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
 
       ByteBuffer buffer = createRandomBuffer(10);
@@ -70,7 +77,7 @@
 
       ServerMessage msg = createMessage(1l, destination, buffer);
       
-      storeImpl.writeOnCurrentPage(msg);
+      assertTrue(storeImpl.writeOnCurrentPage(msg));
       
       assertEquals(1, storeImpl.getNumberOfPages());
       
@@ -80,7 +87,7 @@
       
       storeImpl.init();
       
-      assertEquals(1, storeImpl.getNumberOfPages());
+      assertEquals(2, storeImpl.getNumberOfPages());
       
    }
    
@@ -90,8 +97,12 @@
       
       PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
       
+      storeImpl.init();
+      
       assertEquals(0, storeImpl.getNumberOfPages());
       
+      storeImpl.startPage();
+      
       List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
       
       SimpleString destination = new SimpleString("test");
@@ -105,7 +116,7 @@
    
          ServerMessage msg = createMessage(i+1l, destination, buffer);
 
-         storeImpl.writeOnCurrentPage(msg);
+         assertTrue(storeImpl.writeOnCurrentPage(msg));
       }
       
       
@@ -132,14 +143,20 @@
       
    }
    
-   public void testDepageMultiplPages() throws Exception
+   public void testDepageMultiplePages() throws Exception
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
       TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
       
+      storeImpl.init();
+      
       assertEquals(0, storeImpl.getNumberOfPages());
       
+      storeImpl.startPage();
+      
+      assertEquals(1, storeImpl.getNumberOfPages());
+      
       List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
       
       SimpleString destination = new SimpleString("test");
@@ -159,7 +176,7 @@
          
          ServerMessage msg = createMessage(i+1l, destination, buffer);
 
-         storeImpl.writeOnCurrentPage(msg);
+         assertTrue(storeImpl.writeOnCurrentPage(msg));
       }
       
       
@@ -190,8 +207,12 @@
       
       ServerMessage msg = createMessage(100, destination, buffers.get(0));
       
-      storeImpl.writeOnCurrentPage(msg);
+      assertFalse(storeImpl.writeOnCurrentPage(msg));
       
+      storeImpl.startPage();
+
+      assertTrue(storeImpl.writeOnCurrentPage(msg));
+      
       Page page = storeImpl.dequeuePage();
       
       assertEquals(0, storeImpl.getNumberOfPages());
@@ -211,35 +232,14 @@
       
    }
    
-
-   // Package protected ---------------------------------------------
    
-   // Protected -----------------------------------------------------
-   
-   // Private -------------------------------------------------------
-
-   private ServerMessage createMessage(long messageId,
-         SimpleString destination, ByteBuffer buffer)
+   public void testConcurrentDepage() throws Exception
    {
-      ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
-            System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+      SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
       
-      msg.setMessageID((long)messageId);
+      testConcurrentPaging(factory, 10);
       
-      msg.setDestination(destination);
-      return msg;
    }
-
-   private ByteBuffer createRandomBuffer(int size)
-   {
-      ByteBuffer buffer = ByteBuffer.allocate(size);
-      
-      for (int j = 0; j < buffer.limit(); j++)
-      {
-         buffer.put(RandomUtil.randomByte());
-      }
-      return buffer;
-   }
    
    
    // Inner classes -------------------------------------------------

Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-12 13:55:30 UTC (rev 4793)
@@ -0,0 +1,320 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public abstract class PagingStoreTestBase extends UnitTestCase
+{
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+
+   protected void testConcurrentPaging(SequentialFileFactory factory, final int numberOfThreads) throws Exception,
+         InterruptedException
+   {
+      
+      final int MAX_SIZE = 1024 * 10;
+      
+      final AtomicLong messageIdGenerator = new AtomicLong(0);
+      
+      final AtomicInteger aliveProducers = new AtomicInteger(numberOfThreads);
+      
+      final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
+      
+      final ConcurrentHashMap<Long, ServerMessage> buffers = new ConcurrentHashMap<Long, ServerMessage>();
+      
+      final ArrayList<Page> readPages = new ArrayList<Page>();
+      
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", MAX_SIZE);
+      
+      storeImpl.init();
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      storeImpl.startPage();
+      
+      assertEquals(1, storeImpl.getNumberOfPages());
+      
+      
+   
+      
+      final SimpleString destination = new SimpleString("test");
+      
+      class ProducerThread extends Thread
+      {
+         
+         Exception e;
+         
+         public void run()
+         {
+            
+            try
+            {
+               boolean firstTime = true;
+               while (true)
+               {
+                  long id = messageIdGenerator.incrementAndGet();
+                  ServerMessage msg = createMessage(id, destination, createRandomBuffer(5));
+                  if (storeImpl.writeOnCurrentPage(msg))
+                  {
+                     buffers.put(id, msg);
+                  }
+                  else
+                  {
+                     break;
+                  }
+                  
+                  if (firstTime)
+                  {
+                     latchStart.countDown();
+                     firstTime = false;
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+               this.e = e;
+            }
+            finally
+            {
+               aliveProducers.decrementAndGet();
+            }
+         }
+      }
+      
+      class ConsumerThread extends Thread
+      {
+         Exception e;
+         
+         public void run()
+         {
+            try
+            {
+               // Wait every producer to produce at least one message
+               latchStart.await();
+               while (aliveProducers.get() > 0)
+               {
+                  Page page = storeImpl.dequeuePage();
+                  if (page != null)
+                  {
+                     readPages.add(page);
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+               this.e = e;
+            }
+         }
+      }
+      
+      ProducerThread producerThread[] = new ProducerThread[numberOfThreads];
+      
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         producerThread[i] = new ProducerThread();
+         producerThread[i].start();
+      }
+      
+      ConsumerThread consumer = new ConsumerThread();
+      consumer.start();
+      
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         producerThread[i].join();
+         if (producerThread[i].e != null)
+         {
+            throw producerThread[i].e;
+         }
+      }
+      
+      consumer.join();
+      
+      if (consumer.e != null)
+      {
+         throw consumer.e;
+      }
+   
+      System.out.println("Reading " + buffers.size() + " messages, " + readPages.size() + " pages");
+      
+      final ConcurrentHashMap<Long, ServerMessage> buffers2 = new ConcurrentHashMap<Long, ServerMessage>();
+      
+      for (Page page: readPages)
+      {
+         page.open();
+         ServerMessage msgs[] = page.read();
+         page.close();
+         
+         for (ServerMessage msg : msgs)
+         {
+            ServerMessage msgWritten = buffers.remove(msg.getMessageID());
+            buffers2.put(msg.getMessageID(), msg);
+            assertNotNull(msgWritten);
+            assertEquals (msg.getDestination(), msgWritten.getDestination());
+            assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+         }
+      }
+      
+      assertEquals (0, buffers.size());
+      
+      List<String> files = factory.listFiles("page");
+      
+      assertTrue(files.size() != 0);
+      
+      for (String file: files)
+      {
+         SequentialFile fileTmp = factory.createSequentialFile(file, 1);
+         fileTmp.open();
+         assertTrue (fileTmp.size() + " <= " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
+         fileTmp.close();         
+      }
+      
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, "test", MAX_SIZE);
+      storeImpl2.init();
+      
+      int numberOfPages = storeImpl2.getNumberOfPages();
+      assertTrue(numberOfPages != 0);
+      
+      storeImpl2.startPage();
+      
+
+      
+      storeImpl2.startPage();
+
+      
+      assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
+      
+      long lastMessageId = messageIdGenerator.incrementAndGet();
+      ServerMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
+      
+      storeImpl2.writeOnCurrentPage(lastMsg);
+      buffers2.put(lastMessageId, lastMsg);
+      
+      Page lastPage = null;
+      while (true)
+      {
+         Page page = storeImpl2.dequeuePage();
+         if (page == null)
+         {
+            break;
+         }
+         
+         lastPage = page;
+         
+         page.open();
+         
+         ServerMessage[] msgs = page.read();
+         
+         page.close();
+
+         for (ServerMessage msg: msgs)
+         {
+            
+            ServerMessage msgWritten = buffers2.remove(msg.getMessageID());
+            assertNotNull(msgWritten);
+            assertEquals (msg.getDestination(), msgWritten.getDestination());
+            assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+         }
+      }
+      
+      
+      lastPage.open();
+      ServerMessage lastMessages[] = lastPage.read();
+      lastPage.close();
+      assertEquals(1, lastMessages.length);
+      
+      assertEquals(lastMessages[0].getMessageID(), lastMessageId);
+      assertEqualsByteArrays(lastMessages[0].getBody().array(), lastMsg.getBody().array());
+      
+      System.out.println("Last Message ID = " + lastMsg.getMessageID());
+      
+      
+      assertEquals(0, buffers2.size());
+      
+      
+   }
+
+   protected ServerMessage createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
+   {
+      ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
+            System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+      
+      msg.setMessageID((long)messageId);
+      
+      msg.setDestination(destination);
+      return msg;
+   }
+
+   protected ByteBuffer createRandomBuffer(int size)
+   {
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      
+      for (int j = 0; j < buffer.limit(); j++)
+      {
+         buffer.put(RandomUtil.randomByte());
+      }
+      return buffer;
+   }
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}




More information about the jboss-cvs-commits mailing list