[jboss-cvs] JBoss Messaging SVN: r4784 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 8 01:08:59 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-08 01:08:58 -0400 (Fri, 08 Aug 2008)
New Revision: 4784
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
Log:
Part of PagingStore implementation
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Page.java 2008-08-08 05:08:58 UTC (rev 4784)
@@ -32,10 +32,17 @@
*/
public interface Page
{
+
+ int getPageId();
+
void write(ServerMessage message) throws Exception;
ServerMessage[] read() throws Exception;
+ int getSize();
+
+ int getNumberOfMessages();
+
void sync() throws Exception;
void open() throws Exception;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-08 05:08:58 UTC (rev 4784)
@@ -36,14 +36,18 @@
*/
public interface PagingStore
{
- void writeOnCurrentPage(ServerMessage message);
+ String getStoreName();
+ void writeOnCurrentPage(ServerMessage message) throws Exception;
+
/**
* Remove the first page from the Writing Queue.
* The file will still exist until Page.delete is called,
* So, case the system is reloaded the same Page will be loaded back if delete is not called.
* @return
*/
- Page getOnePage();
+ Page dequeuePage();
+
+ void init() throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-08 05:08:58 UTC (rev 4784)
@@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
@@ -47,22 +48,32 @@
private static final int SIZE_INTEGER = 4;
+ private static final int SIZE_LONG = 8;
+
+ private static final int SIZE_BYTE = 1;
+
+ public static final int SIZE_RECORD = SIZE_LONG + SIZE_INTEGER + SIZE_BYTE + SIZE_BYTE;
+
public static final byte START_BYTE= (byte)'{';
public static final byte END_BYTE= (byte)'}';
// Attributes ----------------------------------------------------
+ private final int pageId;
+ private final AtomicInteger numberOfMessages = new AtomicInteger(0);
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
private final PagingCallback callback;
- private volatile long size = -1;
+ private volatile long preallocSize = -1;
+ private final AtomicInteger size = new AtomicInteger(0);
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageImpl(final SequentialFileFactory factory, final SequentialFile file) throws Exception
+ public PageImpl(final SequentialFileFactory factory, final SequentialFile file, int pageId) throws Exception
{
+ this.pageId = pageId;
this.file = file;
this.fileFactory = factory;
if (factory.isSupportsCallbacks())
@@ -81,6 +92,12 @@
// PagingFile implementation
+
+ public int getPageId()
+ {
+ return pageId;
+ }
+
public ServerMessage[] read() throws Exception
{
@@ -123,23 +140,25 @@
}
}
+ numberOfMessages.set(messages.size());
+
return messages.toArray(instantiateArray(messages.size()));
}
public void write(final ServerMessage message) throws Exception
{
- ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + 6 + 8);
+ ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
buffer.put(START_BYTE);
- buffer.putInt(message.getEncodeSize() + 8);
+ buffer.putInt(message.getEncodeSize() + SIZE_LONG);
buffer.putLong(message.getMessageID());
message.encode(new ByteBufferWrapper(buffer));
buffer.put(END_BYTE);
buffer.rewind();
- expandIfNeeded(buffer.limit());
if (callback != null)
{
callback.countUp();
+ expandIfNeeded(buffer.limit());
file.write(buffer, callback);
}
else
@@ -147,6 +166,9 @@
file.write(buffer, false);
}
+ numberOfMessages.incrementAndGet();
+ size.addAndGet(buffer.limit());
+
}
public void sync() throws Exception
@@ -164,6 +186,8 @@
public void open() throws Exception
{
file.open();
+ this.size.set((int)file.size());
+ file.position((int)file.size());
}
public void close() throws Exception
@@ -176,6 +200,16 @@
file.delete();
}
+ public int getNumberOfMessages()
+ {
+ return numberOfMessages.intValue();
+ }
+
+ public int getSize()
+ {
+ return this.size.intValue();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -192,20 +226,23 @@
// Private -------------------------------------------------------
+
+ // only used on AIO
+ // (This method might go away if we decide to not allow AIO on paging.)
private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
{
- if (size < 0)
+ if (preallocSize < 0)
{
- size = file.size();
+ preallocSize = file.size();
}
- while (file.position() + bytesToWrite > size)
+ while (file.position() + bytesToWrite > preallocSize)
{
final int position = file.position();
- file.fill((int)size, 1024*1024, (byte)0);
+ file.fill((int)preallocSize, 1024*1024, (byte)0);
- size = file.size();
+ preallocSize = file.size();
file.position(position);
}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-08 05:08:58 UTC (rev 4784)
@@ -0,0 +1,185 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.server.ServerMessage;
+
+public class PagingStoreImpl implements PagingStore
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final DecimalFormat format = new DecimalFormat("000000000");
+
+ private final String storeName;
+
+ private final SequentialFileFactory factory;
+
+ private final long maxPageSize;
+
+ private volatile int numberOfPages;
+ private volatile int firstPageId = Integer.MAX_VALUE;
+ private volatile int currentPageId;
+ private volatile Page currentPage;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+
+ public PagingStoreImpl(final SequentialFileFactory factory, final String storeName, final long maxPageSize)
+ {
+ this.factory = factory;
+ this.storeName = storeName;
+ this.maxPageSize = maxPageSize;
+ }
+
+
+ // Public --------------------------------------------------------
+
+ // PagingStore implementation ------------------------------------
+
+ public String getStoreName()
+ {
+ return storeName;
+ }
+
+ public Page dequeuePage()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void writeOnCurrentPage(ServerMessage message) throws Exception
+ {
+ checkPage(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+ }
+
+ public void init() throws Exception
+ {
+
+ lock.writeLock().lock();
+
+ try
+ {
+
+ List<String> files = factory.listFiles(".page");
+
+ numberOfPages = files.size();
+
+ for (String fileName: files)
+ {
+ final int fileId = getPageIdFromFileName(fileName);
+
+ if (fileId > currentPageId)
+ {
+ currentPageId = fileId;
+ }
+
+ if (fileId < firstPageId)
+ {
+ firstPageId = fileId;
+ }
+ }
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+
+ // Private -------------------------------------------------------
+
+
+ private synchronized void checkPage(int bytesToWrite) throws Exception
+ {
+ if (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0)
+ {
+ openNewPage();
+ }
+ }
+
+ private void openNewPage() throws Exception
+ {
+ lock.writeLock().lock();
+
+ try
+ {
+ numberOfPages++;
+ currentPageId++;
+
+ if (currentPage != null)
+ {
+ currentPage.close();
+ }
+
+ SequentialFile file = factory.createSequentialFile(createFileName(currentPageId), 1000);
+
+ currentPage = new PageImpl(factory, file, currentPageId);
+
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ *
+ * @param pageID
+ * @return
+ */
+ private synchronized String createFileName(int pageID)
+ {
+ return format.format(pageID) + ".page";
+ }
+
+ private static int getPageIdFromFileName(String fileName)
+ {
+ return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-08 05:08:58 UTC (rev 4784)
@@ -82,11 +82,11 @@
EasyMock.expect(factory.isSupportsCallbacks()).andStubReturn(callback);
- SequentialFile file = EasyMock.createMock(SequentialFile.class);
+ SequentialFile file = EasyMock.createStrictMock(SequentialFile.class);
EasyMock.replay(factory, file);
- PageImpl impl = new PageImpl(factory, file);
+ PageImpl impl = new PageImpl(factory, file, 1);
EasyMock.verify(factory, file);
@@ -117,18 +117,24 @@
final byte [] expectedBytes = autoEncode((byte)'{', (int)10, (long)1, (byte)5, (byte)6, (byte)'}');
- EasyMock.expect(file.position()).andStubReturn(0);
+ if (callback)
+ {
+
+ // Expanding file (you need that on libaio)
+ EasyMock.expect(file.position()).andStubReturn(0);
- EasyMock.expect(file.size()).andReturn(0l);
-
- file.fill(0, 1024 * 1024, (byte)0);
-
- EasyMock.expect(file.size()).andReturn(1024l * 1024l);
+ EasyMock.expect(file.size()).andReturn(0l);
+
+ file.fill(0, 1024 * 1024, (byte)0);
+
+ EasyMock.expect(file.size()).andReturn(1024l * 1024l);
- file.position(0);
-
- if (callback)
- {
+ file.position(0);
+
+ // End expanding file
+
+
+
EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
public Integer answer() throws Throwable
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-07 22:25:43 UTC (rev 4783)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-08 05:08:58 UTC (rev 4784)
@@ -57,10 +57,12 @@
protected void testAdd(SequentialFileFactory factory, int numberOfElements) throws Exception
{
- SequentialFile file = factory.createSequentialFile("testPage.page", 1);
+ SequentialFile file = factory.createSequentialFile("00010.page", 1);
- PageImpl impl = new PageImpl(factory, file);
+ PageImpl impl = new PageImpl(factory, file, 10);
+ assertEquals(10, impl.getPageId());
+
impl.open();
assertEquals(1, factory.listFiles("page").size());
@@ -88,17 +90,23 @@
msg.setDestination(simpleDestination);
impl.write(msg);
+
+ assertEquals(i + 1, impl.getNumberOfMessages());
}
impl.sync();
impl.close();
- file = factory.createSequentialFile("testPage.page", 1);
+ file = factory.createSequentialFile("00010.page", 1);
file.open();
- impl = new PageImpl(factory, file);
+ impl = new PageImpl(factory, file, 10);
ServerMessage msgs[] = impl.read();
+ assertEquals(numberOfElements, msgs.length);
+
+ assertEquals(numberOfElements, impl.getNumberOfMessages());
+
for (int i = 0; i < msgs.length; i++)
{
assertEquals((long)i, msgs[i].getMessageID());
@@ -107,8 +115,6 @@
assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getBody().array());
}
-
- assertEquals(numberOfElements, msgs.length);
impl.delete();
More information about the jboss-cvs-commits
mailing list