[jboss-cvs] JBoss Messaging SVN: r4784 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 8 01:08:59 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-08 01:08:58 -0400 (Fri, 08 Aug 2008)
New Revision: 4784

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
Log:
Part of PagingStore implementation

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java	2008-08-08 05:08:58 UTC (rev 4784)
@@ -32,10 +32,17 @@
  */
 public interface Page
 {
+   
+   int getPageId();
+   
    void write(ServerMessage message) throws Exception;
    
    ServerMessage[] read() throws Exception;
    
+   int getSize();
+   
+   int getNumberOfMessages();
+   
    void sync() throws Exception;
    
    void open() throws Exception;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-08 05:08:58 UTC (rev 4784)
@@ -36,14 +36,18 @@
  */
 public interface PagingStore
 {
-   void writeOnCurrentPage(ServerMessage message);
    
+   String getStoreName();
    
+   void writeOnCurrentPage(ServerMessage message) throws Exception;
+   
    /** 
     * Remove the first page from the Writing Queue.
     * The file will still exist until Page.delete is called, 
     * So, case the system is reloaded the same Page will be loaded back if delete is not called.
     * @return
     */
-   Page getOnePage();
+   Page dequeuePage();
+   
+   void init() throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java	2008-08-08 05:08:58 UTC (rev 4784)
@@ -25,6 +25,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -47,22 +48,32 @@
    
    private static final int SIZE_INTEGER = 4;
    
+   private static final int SIZE_LONG = 8;
+   
+   private static final int SIZE_BYTE = 1;
+   
+   public static final int SIZE_RECORD = SIZE_LONG + SIZE_INTEGER + SIZE_BYTE + SIZE_BYTE; 
+   
    public static final byte START_BYTE= (byte)'{';
    public static final byte END_BYTE= (byte)'}';
    
    // Attributes ----------------------------------------------------
    
+   private final int pageId;
+   private final AtomicInteger numberOfMessages = new AtomicInteger(0);
    private final SequentialFile file;
    private final SequentialFileFactory fileFactory;
    private final PagingCallback callback;
-   private volatile long size = -1;
+   private volatile long preallocSize = -1;
+   private final AtomicInteger size = new AtomicInteger(0);
    
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-   public PageImpl(final SequentialFileFactory factory, final SequentialFile file) throws Exception
+   public PageImpl(final SequentialFileFactory factory, final SequentialFile file, int pageId) throws Exception
    {
+      this.pageId = pageId;
       this.file = file;
       this.fileFactory = factory;
       if (factory.isSupportsCallbacks())
@@ -81,6 +92,12 @@
    
    // PagingFile implementation
    
+   
+   public int getPageId()
+   {
+      return pageId;
+   }
+   
    public ServerMessage[] read() throws Exception
    {
       
@@ -123,23 +140,25 @@
          }
       }
       
+      numberOfMessages.set(messages.size());
+      
       return messages.toArray(instantiateArray(messages.size()));
    }
    
    public void write(final ServerMessage message) throws Exception
    {
-      ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6 + 8);
+      ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
       buffer.put(START_BYTE);
-      buffer.putInt(message.getEncodeSize() + 8);
+      buffer.putInt(message.getEncodeSize() + SIZE_LONG);
       buffer.putLong(message.getMessageID());
       message.encode(new ByteBufferWrapper(buffer));
       buffer.put(END_BYTE);
       buffer.rewind();
-      expandIfNeeded(buffer.limit());
 
       if (callback != null)
       {
          callback.countUp();
+         expandIfNeeded(buffer.limit());
          file.write(buffer, callback);
       }
       else
@@ -147,6 +166,9 @@
          file.write(buffer, false);
       }
       
+      numberOfMessages.incrementAndGet();
+      size.addAndGet(buffer.limit());
+      
    }
    
    public void sync() throws Exception
@@ -164,6 +186,8 @@
    public void open() throws Exception
    {
       file.open();
+      this.size.set((int)file.size());
+      file.position((int)file.size());
    }
    
    public void close() throws Exception
@@ -176,6 +200,16 @@
       file.delete();
    }
    
+   public int getNumberOfMessages()
+   {
+      return numberOfMessages.intValue();
+   }
+   
+   public int getSize()
+   {
+      return this.size.intValue();
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -192,20 +226,23 @@
    
    // Private -------------------------------------------------------
    
+   
+   // only used on AIO 
+   // (This method might go away if we decide to not allow AIO on paging.)
    private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
    {
-      if (size < 0)
+      if (preallocSize < 0)
       {
-         size = file.size();
+         preallocSize = file.size();
       }
       
-      while (file.position() + bytesToWrite > size)
+      while (file.position() + bytesToWrite > preallocSize)
       {
          final int position = file.position();
          
-         file.fill((int)size, 1024*1024, (byte)0);
+         file.fill((int)preallocSize, 1024*1024, (byte)0);
          
-         size = file.size();
+         preallocSize = file.size();
          
          file.position(position);
       }

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-08 05:08:58 UTC (rev 4784)
@@ -0,0 +1,185 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.server.ServerMessage;
+
+public class PagingStoreImpl implements PagingStore
+{
+
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private final DecimalFormat format = new DecimalFormat("000000000");
+   
+   private final String storeName;
+   
+   private final SequentialFileFactory factory;
+   
+   private final long maxPageSize;
+   
+   private volatile int numberOfPages;
+   private volatile int firstPageId = Integer.MAX_VALUE;
+   private volatile int currentPageId;
+   private volatile Page currentPage;
+   private final ReadWriteLock lock = new ReentrantReadWriteLock();
+   
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   
+   public PagingStoreImpl(final SequentialFileFactory factory, final String storeName, final long maxPageSize) 
+   {
+      this.factory = factory;
+      this.storeName = storeName;
+      this.maxPageSize = maxPageSize;
+   }
+   
+   
+   // Public --------------------------------------------------------
+
+   // PagingStore implementation ------------------------------------
+   
+   public String getStoreName()
+   {
+      return storeName;
+   }
+   
+   public Page dequeuePage()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   public void writeOnCurrentPage(ServerMessage message) throws Exception
+   {
+      checkPage(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+   }
+   
+   public void init() throws Exception
+   {
+      
+      lock.writeLock().lock();
+      
+      try
+      {
+         
+         List<String> files = factory.listFiles(".page");
+         
+         numberOfPages = files.size();
+         
+         for (String fileName: files)
+         {
+            final int fileId = getPageIdFromFileName(fileName);
+            
+            if (fileId > currentPageId)
+            {
+               currentPageId = fileId;
+            }
+            
+            if (fileId < firstPageId)
+            {
+               firstPageId = fileId;
+            }
+         }
+      }
+      finally
+      {
+         lock.writeLock().unlock();
+      }
+   }
+   
+   
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   
+   // Private -------------------------------------------------------
+   
+
+   private synchronized void checkPage(int bytesToWrite) throws Exception
+   {
+      if (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0)
+      {
+         openNewPage();
+      }
+   }
+   
+   private void openNewPage() throws Exception
+   {
+      lock.writeLock().lock();
+      
+      try
+      {
+         numberOfPages++;
+         currentPageId++;
+         
+         if (currentPage != null)
+         {
+            currentPage.close();
+         }
+         
+         SequentialFile file = factory.createSequentialFile(createFileName(currentPageId), 1000);
+         
+         currentPage = new PageImpl(factory, file, currentPageId);
+         
+      }
+      finally
+      {
+         lock.writeLock().unlock();
+      }
+   }
+   
+   /**
+    * 
+    * @param pageID
+    * @return
+    */
+   private synchronized String createFileName(int pageID)
+   {
+      return format.format(pageID) + ".page";
+   }
+   
+   private static int getPageIdFromFileName(String fileName)
+   {
+      return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
+   }
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java	2008-08-08 05:08:58 UTC (rev 4784)
@@ -82,11 +82,11 @@
       
       EasyMock.expect(factory.isSupportsCallbacks()).andStubReturn(callback);
       
-      SequentialFile file = EasyMock.createMock(SequentialFile.class);
+      SequentialFile file = EasyMock.createStrictMock(SequentialFile.class);
       
       EasyMock.replay(factory, file);
       
-      PageImpl impl = new PageImpl(factory, file);
+      PageImpl impl = new PageImpl(factory, file, 1);
       
       EasyMock.verify(factory, file);
       
@@ -117,18 +117,24 @@
       
       final byte [] expectedBytes = autoEncode((byte)'{', (int)10, (long)1, (byte)5, (byte)6, (byte)'}');
       
-      EasyMock.expect(file.position()).andStubReturn(0);
+      if (callback)
+      {
+         
+         // Expanding file (you need that on libaio)
+         EasyMock.expect(file.position()).andStubReturn(0);
 
-      EasyMock.expect(file.size()).andReturn(0l);
-      
-      file.fill(0, 1024 * 1024, (byte)0);
-      
-      EasyMock.expect(file.size()).andReturn(1024l * 1024l);
+         EasyMock.expect(file.size()).andReturn(0l);
+         
+         file.fill(0, 1024 * 1024, (byte)0);
+         
+         EasyMock.expect(file.size()).andReturn(1024l * 1024l);
 
-      file.position(0);
-      
-      if (callback)
-      {
+         file.position(0);
+         
+         // End expanding file
+         
+
+         
          EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
 
             public Integer answer() throws Throwable

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-08-08 05:08:58 UTC (rev 4784)
@@ -57,10 +57,12 @@
    protected void testAdd(SequentialFileFactory factory, int numberOfElements) throws Exception
    {
       
-      SequentialFile file = factory.createSequentialFile("testPage.page", 1);
+      SequentialFile file = factory.createSequentialFile("00010.page", 1);
       
-      PageImpl impl = new PageImpl(factory, file);
+      PageImpl impl = new PageImpl(factory, file, 10);
       
+      assertEquals(10, impl.getPageId());
+      
       impl.open();
 
       assertEquals(1, factory.listFiles("page").size());
@@ -88,17 +90,23 @@
          msg.setDestination(simpleDestination);
          
          impl.write(msg);
+         
+         assertEquals(i + 1, impl.getNumberOfMessages());
       }
       
       impl.sync();
       impl.close();
       
-      file = factory.createSequentialFile("testPage.page", 1);
+      file = factory.createSequentialFile("00010.page", 1);
       file.open();
-      impl = new PageImpl(factory, file);
+      impl = new PageImpl(factory, file, 10);
       
       ServerMessage msgs[] = impl.read();
       
+      assertEquals(numberOfElements, msgs.length);
+
+      assertEquals(numberOfElements, impl.getNumberOfMessages());
+      
       for (int i = 0; i < msgs.length; i++)
       {
          assertEquals((long)i, msgs[i].getMessageID());
@@ -107,8 +115,6 @@
          
          assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getBody().array());
       }
-      
-      assertEquals(numberOfElements, msgs.length);
 
       impl.delete();
       




More information about the jboss-cvs-commits mailing list