[jboss-cvs] JBoss Messaging SVN: r4790 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 8 15:30:38 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-08 15:30:38 -0400 (Fri, 08 Aug 2008)
New Revision: 4790

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Log:
PagingStore implementation and more tests

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-08 13:02:26 UTC (rev 4789)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-08 19:30:38 UTC (rev 4790)
@@ -37,8 +37,12 @@
 public interface PagingStore
 {
    
+   int getNumberOfPages();
+   
    String getStoreName();
    
+   void sync() throws Exception;
+   
    void writeOnCurrentPage(ServerMessage message) throws Exception;
    
    /** 
@@ -46,8 +50,9 @@
     * The file will still exist until Page.delete is called, 
     * So, case the system is reloaded the same Page will be loaded back if delete is not called.
     * @return
+    * @throws Exception 
     */
-   Page dequeuePage();
+   Page dequeuePage() throws Exception;
    
    void init() throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-08 13:02:26 UTC (rev 4789)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-08 19:30:38 UTC (rev 4790)
@@ -34,7 +34,12 @@
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.server.ServerMessage;
 
-public class PagingStoreImpl implements PagingStore
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PagingStoreImpl implements PagingStore, TestSupportPageStore
 {
 
    // Constants -----------------------------------------------------
@@ -73,22 +78,97 @@
 
    // PagingStore implementation ------------------------------------
    
+   
+   public int getNumberOfPages()
+   {
+      return numberOfPages;
+   }
+   
    public String getStoreName()
    {
       return storeName;
    }
    
-   public Page dequeuePage()
+   public Page dequeuePage() throws Exception
    {
-      // TODO Auto-generated method stub
-      return null;
+      
+      lock.writeLock().lock();
+      
+      try
+      {
+         
+         if (numberOfPages == 0)
+         {
+            return null;
+         }
+         else
+         {
+
+            numberOfPages--;
+            
+            final Page returnPage;
+            if (currentPageId == firstPageId)
+            {
+               if (currentPage != null)
+               {
+                  returnPage = currentPage;
+                  returnPage.close();
+                  currentPage = null;
+               }
+               else
+               {
+                  returnPage = createPage(currentPageId);
+               }
+               
+               firstPageId = Integer.MAX_VALUE;
+               
+               return returnPage;
+            }
+            else
+            {
+               returnPage =  createPage(firstPageId++);
+            }
+            
+            return returnPage;
+         }
+      }
+      finally
+      {
+         lock.writeLock().unlock();
+      }
+      
    }
 
    public void writeOnCurrentPage(ServerMessage message) throws Exception
    {
       checkPage(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+      
+      lock.readLock().lock();
+      
+      try
+      {
+         currentPage.write(message);
+      }
+      finally
+      {
+         lock.readLock().unlock();
+      }
    }
    
+   public void sync() throws Exception
+   {
+      lock.readLock().lock();
+      
+      try
+      {
+         currentPage.sync();
+      }
+      finally
+      {
+         lock.readLock().unlock();
+      }
+   }
+   
    public void init() throws Exception
    {
       
@@ -97,7 +177,7 @@
       try
       {
          
-         List<String> files = factory.listFiles(".page");
+         List<String> files = factory.listFiles("page");
          
          numberOfPages = files.size();
          
@@ -124,6 +204,14 @@
    
    
    
+   // TestSupportPageStore ------------------------------------------
+   
+   public void forceAnotherPage() throws Exception
+   {
+      openNewPage();
+   }
+   
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -134,7 +222,7 @@
 
    private synchronized void checkPage(int bytesToWrite) throws Exception
    {
-      if (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0)
+      if (currentPage == null || (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0))
       {
          openNewPage();
       }
@@ -149,14 +237,19 @@
          numberOfPages++;
          currentPageId++;
          
+         if (currentPageId < firstPageId)
+         {
+            firstPageId = currentPageId;
+         }
+         
          if (currentPage != null)
          {
             currentPage.close();
          }
          
-         SequentialFile file = factory.createSequentialFile(createFileName(currentPageId), 1000);
+         currentPage = createPage(currentPageId);
          
-         currentPage = new PageImpl(factory, file, currentPageId);
+         currentPage.open();
          
       }
       finally
@@ -164,13 +257,24 @@
          lock.writeLock().unlock();
       }
    }
+
+
+   private Page createPage(int page) throws Exception
+   {
+      String fileName = createFileName(page);
+      SequentialFile file = factory.createSequentialFile(fileName, 1000);
+      
+      return new PageImpl(factory, file, page);
+   }
    
    /**
     * 
+    * Note: Decimalformat is not thread safe, Use synchronization before calling this method
+    * 
     * @param pageID
     * @return
     */
-   private synchronized String createFileName(int pageID)
+   private String createFileName(int pageID)
    {
       return format.format(pageID) + ".page";
    }

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-08-08 19:30:38 UTC (rev 4790)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import org.jboss.messaging.core.paging.PagingStore;
+
+/**
+ * All the methods required to TestCases on  PageStoreImpl
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface TestSupportPageStore extends PagingStore
+{
+   void forceAnotherPage() throws Exception;
+}

Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-08 19:30:38 UTC (rev 4790)
@@ -0,0 +1,247 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+public class PagingStoreImplTest extends UnitTestCase
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   public void testStore() throws Exception
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+      
+      PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+      ByteBuffer buffer = createRandomBuffer(10);
+      
+      buffers.add(buffer);
+      SimpleString destination = new SimpleString("test");
+
+      ServerMessage msg = createMessage(1l, destination, buffer);
+      
+      storeImpl.writeOnCurrentPage(msg);
+      
+      assertEquals(1, storeImpl.getNumberOfPages());
+      
+      storeImpl.sync();
+      
+      storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+      
+      storeImpl.init();
+      
+      assertEquals(1, storeImpl.getNumberOfPages());
+      
+   }
+   
+   public void testDepageOnCurrentPage() throws Exception
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+      
+      PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      
+      SimpleString destination = new SimpleString("test");
+
+      for (int i = 0; i < 10; i++)
+      {
+
+         ByteBuffer buffer = createRandomBuffer(10);
+         
+         buffers.add(buffer);
+   
+         ServerMessage msg = createMessage(i+1l, destination, buffer);
+
+         storeImpl.writeOnCurrentPage(msg);
+      }
+      
+      
+      assertEquals(1, storeImpl.getNumberOfPages());
+      
+      storeImpl.sync();
+      
+      Page page = storeImpl.dequeuePage();
+      
+      
+      page.open();
+      
+      ServerMessage msg[] = page.read();
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      assertEquals(10, msg.length);
+      
+      for (int i = 0; i < 10; i++)
+      {
+         assertEquals(i + 1l, msg[i].getMessageID());
+         assertEqualsByteArrays(buffers.get(i).array(), msg[i].getBody().array());
+      }
+      
+   }
+   
+   public void testDepageMultiplPages() throws Exception
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+      
+      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      
+      SimpleString destination = new SimpleString("test");
+
+      for (int i = 0; i < 10; i++)
+      {
+
+         ByteBuffer buffer = createRandomBuffer(10);
+         
+         buffers.add(buffer);
+   
+         if (i == 5)
+         {
+            storeImpl.forceAnotherPage();
+         }
+         
+         
+         ServerMessage msg = createMessage(i+1l, destination, buffer);
+
+         storeImpl.writeOnCurrentPage(msg);
+      }
+      
+      
+      assertEquals(2, storeImpl.getNumberOfPages());
+      
+      storeImpl.sync();
+      
+      for (int pageNr = 0; pageNr < 2; pageNr++)
+      {
+         Page page = storeImpl.dequeuePage();
+         
+         page.open();
+         
+         ServerMessage msg[] = page.read();
+         
+         page.close();
+
+         assertEquals(5, msg.length);
+         
+         for (int i = 0; i < 5; i++)
+         {
+            assertEquals(pageNr*5 + i + 1l, msg[i].getMessageID());
+            assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getBody().array());
+         }
+      }
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      ServerMessage msg = createMessage(100, destination, buffers.get(0));
+      
+      storeImpl.writeOnCurrentPage(msg);
+      
+      Page page = storeImpl.dequeuePage();
+      
+      assertEquals(0, storeImpl.getNumberOfPages());
+      
+      page.open();
+      
+      ServerMessage msgs[] = page.read();
+      
+      assertEquals(1, msgs.length);
+      
+      assertEquals(100l, msgs[0].getMessageID());
+      
+      assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getBody().array());
+      
+      assertNull(storeImpl.dequeuePage());
+      
+      
+   }
+   
+
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+
+   private ServerMessage createMessage(long messageId,
+         SimpleString destination, ByteBuffer buffer)
+   {
+      ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
+            System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+      
+      msg.setMessageID((long)messageId);
+      
+      msg.setDestination(destination);
+      return msg;
+   }
+
+   private ByteBuffer createRandomBuffer(int size)
+   {
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      
+      for (int j = 0; j < buffer.limit(); j++)
+      {
+         buffer.put(RandomUtil.randomByte());
+      }
+      return buffer;
+   }
+   
+   
+   // Inner classes -------------------------------------------------
+   
+}




More information about the jboss-cvs-commits mailing list