[jboss-cvs] JBoss Messaging SVN: r4790 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 8 15:30:38 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-08 15:30:38 -0400 (Fri, 08 Aug 2008)
New Revision: 4790
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Log:
PagingStore implementation and more tests
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-08 13:02:26 UTC (rev 4789)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-08 19:30:38 UTC (rev 4790)
@@ -37,8 +37,12 @@
public interface PagingStore
{
+ int getNumberOfPages();
+
String getStoreName();
+ void sync() throws Exception;
+
void writeOnCurrentPage(ServerMessage message) throws Exception;
/**
@@ -46,8 +50,9 @@
* The file will still exist until Page.delete is called,
* So, case the system is reloaded the same Page will be loaded back if delete is not called.
* @return
+ * @throws Exception
*/
- Page dequeuePage();
+ Page dequeuePage() throws Exception;
void init() throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-08 13:02:26 UTC (rev 4789)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-08 19:30:38 UTC (rev 4790)
@@ -34,7 +34,12 @@
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.server.ServerMessage;
-public class PagingStoreImpl implements PagingStore
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PagingStoreImpl implements PagingStore, TestSupportPageStore
{
// Constants -----------------------------------------------------
@@ -73,22 +78,97 @@
// PagingStore implementation ------------------------------------
+
+ public int getNumberOfPages()
+ {
+ return numberOfPages;
+ }
+
public String getStoreName()
{
return storeName;
}
- public Page dequeuePage()
+ public Page dequeuePage() throws Exception
{
- // TODO Auto-generated method stub
- return null;
+
+ lock.writeLock().lock();
+
+ try
+ {
+
+ if (numberOfPages == 0)
+ {
+ return null;
+ }
+ else
+ {
+
+ numberOfPages--;
+
+ final Page returnPage;
+ if (currentPageId == firstPageId)
+ {
+ if (currentPage != null)
+ {
+ returnPage = currentPage;
+ returnPage.close();
+ currentPage = null;
+ }
+ else
+ {
+ returnPage = createPage(currentPageId);
+ }
+
+ firstPageId = Integer.MAX_VALUE;
+
+ return returnPage;
+ }
+ else
+ {
+ returnPage = createPage(firstPageId++);
+ }
+
+ return returnPage;
+ }
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+
}
public void writeOnCurrentPage(ServerMessage message) throws Exception
{
checkPage(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+
+ lock.readLock().lock();
+
+ try
+ {
+ currentPage.write(message);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
}
+ public void sync() throws Exception
+ {
+ lock.readLock().lock();
+
+ try
+ {
+ currentPage.sync();
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
public void init() throws Exception
{
@@ -97,7 +177,7 @@
try
{
- List<String> files = factory.listFiles(".page");
+ List<String> files = factory.listFiles("page");
numberOfPages = files.size();
@@ -124,6 +204,14 @@
+ // TestSupportPageStore ------------------------------------------
+
+ public void forceAnotherPage() throws Exception
+ {
+ openNewPage();
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -134,7 +222,7 @@
private synchronized void checkPage(int bytesToWrite) throws Exception
{
- if (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0)
+ if (currentPage == null || (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0))
{
openNewPage();
}
@@ -149,14 +237,19 @@
numberOfPages++;
currentPageId++;
+ if (currentPageId < firstPageId)
+ {
+ firstPageId = currentPageId;
+ }
+
if (currentPage != null)
{
currentPage.close();
}
- SequentialFile file = factory.createSequentialFile(createFileName(currentPageId), 1000);
+ currentPage = createPage(currentPageId);
- currentPage = new PageImpl(factory, file, currentPageId);
+ currentPage.open();
}
finally
@@ -164,13 +257,24 @@
lock.writeLock().unlock();
}
}
+
+
+ private Page createPage(int page) throws Exception
+ {
+ String fileName = createFileName(page);
+ SequentialFile file = factory.createSequentialFile(fileName, 1000);
+
+ return new PageImpl(factory, file, page);
+ }
/**
*
+ * Note: Decimalformat is not thread safe, Use synchronization before calling this method
+ *
* @param pageID
* @return
*/
- private synchronized String createFileName(int pageID)
+ private String createFileName(int pageID)
{
return format.format(pageID) + ".page";
}
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-08-08 19:30:38 UTC (rev 4790)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.paging.impl;
+
+import org.jboss.messaging.core.paging.PagingStore;
+
+/**
+ * All the methods required to TestCases on PageStoreImpl
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface TestSupportPageStore extends PagingStore
+{
+ void forceAnotherPage() throws Exception;
+}
Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-08 19:30:38 UTC (rev 4790)
@@ -0,0 +1,247 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+public class PagingStoreImplTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testStore() throws Exception
+ {
+ SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ ByteBuffer buffer = createRandomBuffer(10);
+
+ buffers.add(buffer);
+ SimpleString destination = new SimpleString("test");
+
+ ServerMessage msg = createMessage(1l, destination, buffer);
+
+ storeImpl.writeOnCurrentPage(msg);
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
+ storeImpl.sync();
+
+ storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+
+ storeImpl.init();
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
+ }
+
+ public void testDepageOnCurrentPage() throws Exception
+ {
+ SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ SimpleString destination = new SimpleString("test");
+
+ for (int i = 0; i < 10; i++)
+ {
+
+ ByteBuffer buffer = createRandomBuffer(10);
+
+ buffers.add(buffer);
+
+ ServerMessage msg = createMessage(i+1l, destination, buffer);
+
+ storeImpl.writeOnCurrentPage(msg);
+ }
+
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
+ storeImpl.sync();
+
+ Page page = storeImpl.dequeuePage();
+
+
+ page.open();
+
+ ServerMessage msg[] = page.read();
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ assertEquals(10, msg.length);
+
+ for (int i = 0; i < 10; i++)
+ {
+ assertEquals(i + 1l, msg[i].getMessageID());
+ assertEqualsByteArrays(buffers.get(i).array(), msg[i].getBody().array());
+ }
+
+ }
+
+ public void testDepageMultiplPages() throws Exception
+ {
+ SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ SimpleString destination = new SimpleString("test");
+
+ for (int i = 0; i < 10; i++)
+ {
+
+ ByteBuffer buffer = createRandomBuffer(10);
+
+ buffers.add(buffer);
+
+ if (i == 5)
+ {
+ storeImpl.forceAnotherPage();
+ }
+
+
+ ServerMessage msg = createMessage(i+1l, destination, buffer);
+
+ storeImpl.writeOnCurrentPage(msg);
+ }
+
+
+ assertEquals(2, storeImpl.getNumberOfPages());
+
+ storeImpl.sync();
+
+ for (int pageNr = 0; pageNr < 2; pageNr++)
+ {
+ Page page = storeImpl.dequeuePage();
+
+ page.open();
+
+ ServerMessage msg[] = page.read();
+
+ page.close();
+
+ assertEquals(5, msg.length);
+
+ for (int i = 0; i < 5; i++)
+ {
+ assertEquals(pageNr*5 + i + 1l, msg[i].getMessageID());
+ assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getBody().array());
+ }
+ }
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ ServerMessage msg = createMessage(100, destination, buffers.get(0));
+
+ storeImpl.writeOnCurrentPage(msg);
+
+ Page page = storeImpl.dequeuePage();
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ page.open();
+
+ ServerMessage msgs[] = page.read();
+
+ assertEquals(1, msgs.length);
+
+ assertEquals(100l, msgs[0].getMessageID());
+
+ assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getBody().array());
+
+ assertNull(storeImpl.dequeuePage());
+
+
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private ServerMessage createMessage(long messageId,
+ SimpleString destination, ByteBuffer buffer)
+ {
+ ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
+ System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+
+ msg.setMessageID((long)messageId);
+
+ msg.setDestination(destination);
+ return msg;
+ }
+
+ private ByteBuffer createRandomBuffer(int size)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+
+ for (int j = 0; j < buffer.limit(); j++)
+ {
+ buffer.put(RandomUtil.randomByte());
+ }
+ return buffer;
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list