[jboss-cvs] JBoss Messaging SVN: r4793 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/journal/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 12 09:55:30 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-12 09:55:30 -0400 (Tue, 12 Aug 2008)
New Revision: 4793
Added:
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Paging store concurrency tests
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -48,4 +48,6 @@
int getAlignment();
+ int calculateBlockSize(int bytes) throws Exception;
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -77,4 +77,13 @@
newbuffer.put(bytes);
return newbuffer;
}
+
+ public int calculateBlockSize(int position) throws Exception
+ {
+ int alignment = getAlignment();
+
+ int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+ return pos;
+ }
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -67,6 +67,11 @@
{
return 1;
}
+
+ public int calculateBlockSize(int bytes) throws Exception
+ {
+ return bytes;
+ }
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -41,9 +41,11 @@
String getStoreName();
+ void startPage() throws Exception;
+
void sync() throws Exception;
- void writeOnCurrentPage(ServerMessage message) throws Exception;
+ boolean writeOnCurrentPage(ServerMessage message) throws Exception;
/**
* Remove the first page from the Writing Queue.
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -64,7 +64,6 @@
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
private final PagingCallback callback;
- private volatile long preallocSize = -1;
private final AtomicInteger size = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -158,7 +157,6 @@
if (callback != null)
{
callback.countUp();
- expandIfNeeded(buffer.limit());
file.write(buffer, callback);
}
else
@@ -187,7 +185,7 @@
{
file.open();
this.size.set((int)file.size());
- file.position((int)file.size());
+ file.position(0);
}
public void close() throws Exception
@@ -227,27 +225,6 @@
// Private -------------------------------------------------------
- // only used on AIO
- // (This method might go away if we decide to not allow AIO on paging.)
- private synchronized void expandIfNeeded(final int bytesToWrite) throws Exception
- {
- if (preallocSize < 0)
- {
- preallocSize = file.size();
- }
-
- while (file.position() + bytesToWrite > preallocSize)
- {
- final int position = file.position();
-
- file.fill((int)preallocSize, 1024*1024, (byte)0);
-
- preallocSize = file.size();
-
- file.position(position);
- }
- }
-
// Inner classes -------------------------------------------------
private static class PagingCallback implements IOCallback
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -25,6 +25,8 @@
import java.text.DecimalFormat;
import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,6 +50,8 @@
private final DecimalFormat format = new DecimalFormat("000000000");
+ private final AtomicInteger pageUsedSize = new AtomicInteger(0);
+
private final String storeName;
private final SequentialFileFactory factory;
@@ -58,7 +62,12 @@
private volatile int firstPageId = Integer.MAX_VALUE;
private volatile int currentPageId;
private volatile Page currentPage;
+
+ // This is supposed to perform better than synchronized methods
+ private final Semaphore globalLock = new Semaphore(1);
+
private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private volatile boolean initialized = false;
// Static --------------------------------------------------------
@@ -91,7 +100,9 @@
public Page dequeuePage() throws Exception
{
+ validateInit();
+ globalLock.acquire();
lock.writeLock().lock();
try
@@ -135,28 +146,72 @@
finally
{
lock.writeLock().unlock();
+ globalLock.release();
}
}
- public void writeOnCurrentPage(ServerMessage message) throws Exception
+ public boolean writeOnCurrentPage(ServerMessage message) throws Exception
{
- checkPage(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+ validateInit();
- lock.readLock().lock();
+ int bytesToWrite = factory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+ globalLock.acquire();
+
try
{
- currentPage.write(message);
+ if (currentPage == null)
+ {
+ return false;
+ }
+
+ if ((pageUsedSize.addAndGet(bytesToWrite) > maxPageSize && currentPage.getNumberOfMessages() > 0))
+ {
+ lock.writeLock().lock();
+ try
+ {
+ openNewPage();
+ pageUsedSize.addAndGet(bytesToWrite);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+ // we must get the readLock before we release the globalLock
+ // or else we could end up with files records being added to the currentPage even if the max size was already achieved.
+ // (Condition tested by PagingStoreTestPage::testConcurrentPaging, The test would eventually fail, 1 in 100)
+ lock.readLock().lock();
+
}
finally
{
+ globalLock.release();
+ }
+
+ try
+ {
+ if (currentPage != null)
+ {
+ currentPage.write(message);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
lock.readLock().unlock();
}
}
public void sync() throws Exception
{
+ validateInit();
+
lock.readLock().lock();
try
@@ -171,6 +226,11 @@
public void init() throws Exception
{
+
+ if (initialized)
+ {
+ throw new IllegalStateException("PageStore " + this.storeName + " already initialized!");
+ }
lock.writeLock().lock();
@@ -195,6 +255,13 @@
firstPageId = fileId;
}
}
+
+ initialized = true;
+
+ if (numberOfPages != 0)
+ {
+ startPage();
+ }
}
finally
{
@@ -202,12 +269,31 @@
}
}
+ public void startPage() throws Exception
+ {
+ validateInit();
+
+ globalLock.acquire();
+ try
+ {
+ if (currentPage == null)
+ {
+ openNewPage();
+ }
+ }
+ finally
+ {
+ globalLock.release();
+ }
+ }
+
// TestSupportPageStore ------------------------------------------
public void forceAnotherPage() throws Exception
{
+ validateInit();
openNewPage();
}
@@ -220,14 +306,6 @@
// Private -------------------------------------------------------
- private synchronized void checkPage(int bytesToWrite) throws Exception
- {
- if (currentPage == null || (currentPage.getSize() + bytesToWrite > maxPageSize && currentPage.getNumberOfMessages() > 0))
- {
- openNewPage();
- }
- }
-
private void openNewPage() throws Exception
{
lock.writeLock().lock();
@@ -249,6 +327,8 @@
currentPage = createPage(currentPageId);
+ pageUsedSize.set(0);
+
currentPage.open();
}
@@ -264,6 +344,19 @@
String fileName = createFileName(page);
SequentialFile file = factory.createSequentialFile(fileName, 1000);
+ file.open();
+
+ long size = file.size();
+
+ if (factory.isSupportsCallbacks() && size < maxPageSize)
+ {
+ file.fill((int)size, (int)maxPageSize - (int)size, (byte)0);
+ }
+
+ file.position(0);
+
+ file.close();
+
return new PageImpl(factory, file, page);
}
@@ -284,6 +377,14 @@
return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
}
+ private void validateInit()
+ {
+ if (!initialized)
+ {
+ throw new IllegalStateException("PagingStore " + this.storeName + " not initialized!");
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -78,7 +78,7 @@
protected void tearDown() throws Exception
{
super.tearDown();
- //deleteDirectory(new File(journalDir));
+ deleteDirectory(new File(journalDir));
}
// Private -------------------------------------------------------
Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.paging;
+
+import java.io.File;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.paging.impl.PagingStoreTestBase;
+
+public class PagingStoreIntegrationTest extends PagingStoreTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/journal-test";
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testPageStoreWithAIO() throws Exception
+ {
+ if (!AsynchronousFileImpl.isLoaded())
+ {
+ fail(String.format("libAIO is not loaded on %s %s %s",
+ System.getProperty("os.name"),
+ System.getProperty("os.arch"),
+ System.getProperty("os.version")));
+ }
+ testConcurrentPaging(new AIOSequentialFileFactory(journalDir), 10);
+ }
+
+ public void testPageWithNIO() throws Exception
+ {
+ // This integration test could fail 1 in 100 due to race conditions.
+ for (int i = 0; i < 100; i++)
+ {
+ recreateDirectory();
+ System.out.println("Test " + i);
+ testConcurrentPaging(new NIOSequentialFileFactory(journalDir), 10);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ recreateDirectory();
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ //deleteDirectory(new File(journalDir));
+ }
+
+ // Private -------------------------------------------------------
+
+ private void recreateDirectory()
+ {
+ File fileJournalDir = new File(journalDir);
+ deleteDirectory(fileJournalDir);
+ fileJournalDir.mkdirs();
+ }
+
+ // Inner classes -------------------------------------------------
+
+
+}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -140,6 +140,15 @@
return ByteBuffer.allocateDirect(size);
}
+ public int calculateBlockSize(int position) throws Exception
+ {
+ int alignment = getAlignment();
+
+ int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+ return pos;
+ }
+
public ByteBuffer wrapBuffer(byte[] bytes)
{
return ByteBuffer.wrap(bytes);
@@ -375,7 +384,7 @@
return data.position();
}
- public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public synchronized int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
if (!open)
{
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -119,22 +119,6 @@
if (callback)
{
-
- // Expanding file (you need that on libaio)
- EasyMock.expect(file.position()).andStubReturn(0);
-
- EasyMock.expect(file.size()).andReturn(0l);
-
- file.fill(0, 1024 * 1024, (byte)0);
-
- EasyMock.expect(file.size()).andReturn(1024l * 1024l);
-
- file.position(0);
-
- // End expanding file
-
-
-
EasyMock.expect(file.write(compareByteBuffer(expectedBytes), EasyMock.isA(IOCallback.class))).andAnswer(new IAnswer<Integer>(){
public Integer answer() throws Throwable
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -36,6 +36,11 @@
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
public abstract class PageImplTestBase extends UnitTestCase
{
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-12 12:53:32 UTC (rev 4792)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -32,15 +32,16 @@
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.tests.util.RandomUtil;
-import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
-public class PagingStoreImplTest extends UnitTestCase
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class PagingStoreImplTest extends PagingStoreTestBase
{
// Constants -----------------------------------------------------
@@ -59,8 +60,14 @@
PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+ storeImpl.init();
+
assertEquals(0, storeImpl.getNumberOfPages());
+ storeImpl.startPage();
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
ByteBuffer buffer = createRandomBuffer(10);
@@ -70,7 +77,7 @@
ServerMessage msg = createMessage(1l, destination, buffer);
- storeImpl.writeOnCurrentPage(msg);
+ assertTrue(storeImpl.writeOnCurrentPage(msg));
assertEquals(1, storeImpl.getNumberOfPages());
@@ -80,7 +87,7 @@
storeImpl.init();
- assertEquals(1, storeImpl.getNumberOfPages());
+ assertEquals(2, storeImpl.getNumberOfPages());
}
@@ -90,8 +97,12 @@
PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+ storeImpl.init();
+
assertEquals(0, storeImpl.getNumberOfPages());
+ storeImpl.startPage();
+
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
SimpleString destination = new SimpleString("test");
@@ -105,7 +116,7 @@
ServerMessage msg = createMessage(i+1l, destination, buffer);
- storeImpl.writeOnCurrentPage(msg);
+ assertTrue(storeImpl.writeOnCurrentPage(msg));
}
@@ -132,14 +143,20 @@
}
- public void testDepageMultiplPages() throws Exception
+ public void testDepageMultiplePages() throws Exception
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+ storeImpl.init();
+
assertEquals(0, storeImpl.getNumberOfPages());
+ storeImpl.startPage();
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
SimpleString destination = new SimpleString("test");
@@ -159,7 +176,7 @@
ServerMessage msg = createMessage(i+1l, destination, buffer);
- storeImpl.writeOnCurrentPage(msg);
+ assertTrue(storeImpl.writeOnCurrentPage(msg));
}
@@ -190,8 +207,12 @@
ServerMessage msg = createMessage(100, destination, buffers.get(0));
- storeImpl.writeOnCurrentPage(msg);
+ assertFalse(storeImpl.writeOnCurrentPage(msg));
+ storeImpl.startPage();
+
+ assertTrue(storeImpl.writeOnCurrentPage(msg));
+
Page page = storeImpl.dequeuePage();
assertEquals(0, storeImpl.getNumberOfPages());
@@ -211,35 +232,14 @@
}
-
- // Package protected ---------------------------------------------
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private ServerMessage createMessage(long messageId,
- SimpleString destination, ByteBuffer buffer)
+ public void testConcurrentDepage() throws Exception
{
- ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
- System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+ SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
- msg.setMessageID((long)messageId);
+ testConcurrentPaging(factory, 10);
- msg.setDestination(destination);
- return msg;
}
-
- private ByteBuffer createRandomBuffer(int size)
- {
- ByteBuffer buffer = ByteBuffer.allocate(size);
-
- for (int j = 0; j < buffer.limit(); j++)
- {
- buffer.put(RandomUtil.randomByte());
- }
- return buffer;
- }
// Inner classes -------------------------------------------------
Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-12 13:55:30 UTC (rev 4793)
@@ -0,0 +1,320 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public abstract class PagingStoreTestBase extends UnitTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void testConcurrentPaging(SequentialFileFactory factory, final int numberOfThreads) throws Exception,
+ InterruptedException
+ {
+
+ final int MAX_SIZE = 1024 * 10;
+
+ final AtomicLong messageIdGenerator = new AtomicLong(0);
+
+ final AtomicInteger aliveProducers = new AtomicInteger(numberOfThreads);
+
+ final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
+
+ final ConcurrentHashMap<Long, ServerMessage> buffers = new ConcurrentHashMap<Long, ServerMessage>();
+
+ final ArrayList<Page> readPages = new ArrayList<Page>();
+
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", MAX_SIZE);
+
+ storeImpl.init();
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ storeImpl.startPage();
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
+
+
+
+ final SimpleString destination = new SimpleString("test");
+
+ class ProducerThread extends Thread
+ {
+
+ Exception e;
+
+ public void run()
+ {
+
+ try
+ {
+ boolean firstTime = true;
+ while (true)
+ {
+ long id = messageIdGenerator.incrementAndGet();
+ ServerMessage msg = createMessage(id, destination, createRandomBuffer(5));
+ if (storeImpl.writeOnCurrentPage(msg))
+ {
+ buffers.put(id, msg);
+ }
+ else
+ {
+ break;
+ }
+
+ if (firstTime)
+ {
+ latchStart.countDown();
+ firstTime = false;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ this.e = e;
+ }
+ finally
+ {
+ aliveProducers.decrementAndGet();
+ }
+ }
+ }
+
+ class ConsumerThread extends Thread
+ {
+ Exception e;
+
+ public void run()
+ {
+ try
+ {
+ // Wait every producer to produce at least one message
+ latchStart.await();
+ while (aliveProducers.get() > 0)
+ {
+ Page page = storeImpl.dequeuePage();
+ if (page != null)
+ {
+ readPages.add(page);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ this.e = e;
+ }
+ }
+ }
+
+ ProducerThread producerThread[] = new ProducerThread[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ producerThread[i] = new ProducerThread();
+ producerThread[i].start();
+ }
+
+ ConsumerThread consumer = new ConsumerThread();
+ consumer.start();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ producerThread[i].join();
+ if (producerThread[i].e != null)
+ {
+ throw producerThread[i].e;
+ }
+ }
+
+ consumer.join();
+
+ if (consumer.e != null)
+ {
+ throw consumer.e;
+ }
+
+ System.out.println("Reading " + buffers.size() + " messages, " + readPages.size() + " pages");
+
+ final ConcurrentHashMap<Long, ServerMessage> buffers2 = new ConcurrentHashMap<Long, ServerMessage>();
+
+ for (Page page: readPages)
+ {
+ page.open();
+ ServerMessage msgs[] = page.read();
+ page.close();
+
+ for (ServerMessage msg : msgs)
+ {
+ ServerMessage msgWritten = buffers.remove(msg.getMessageID());
+ buffers2.put(msg.getMessageID(), msg);
+ assertNotNull(msgWritten);
+ assertEquals (msg.getDestination(), msgWritten.getDestination());
+ assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+ }
+ }
+
+ assertEquals (0, buffers.size());
+
+ List<String> files = factory.listFiles("page");
+
+ assertTrue(files.size() != 0);
+
+ for (String file: files)
+ {
+ SequentialFile fileTmp = factory.createSequentialFile(file, 1);
+ fileTmp.open();
+ assertTrue (fileTmp.size() + " <= " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
+ fileTmp.close();
+ }
+
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, "test", MAX_SIZE);
+ storeImpl2.init();
+
+ int numberOfPages = storeImpl2.getNumberOfPages();
+ assertTrue(numberOfPages != 0);
+
+ storeImpl2.startPage();
+
+
+
+ storeImpl2.startPage();
+
+
+ assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
+
+ long lastMessageId = messageIdGenerator.incrementAndGet();
+ ServerMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
+
+ storeImpl2.writeOnCurrentPage(lastMsg);
+ buffers2.put(lastMessageId, lastMsg);
+
+ Page lastPage = null;
+ while (true)
+ {
+ Page page = storeImpl2.dequeuePage();
+ if (page == null)
+ {
+ break;
+ }
+
+ lastPage = page;
+
+ page.open();
+
+ ServerMessage[] msgs = page.read();
+
+ page.close();
+
+ for (ServerMessage msg: msgs)
+ {
+
+ ServerMessage msgWritten = buffers2.remove(msg.getMessageID());
+ assertNotNull(msgWritten);
+ assertEquals (msg.getDestination(), msgWritten.getDestination());
+ assertEqualsByteArrays(msgWritten.getBody().array(), msg.getBody().array());
+ }
+ }
+
+
+ lastPage.open();
+ ServerMessage lastMessages[] = lastPage.read();
+ lastPage.close();
+ assertEquals(1, lastMessages.length);
+
+ assertEquals(lastMessages[0].getMessageID(), lastMessageId);
+ assertEqualsByteArrays(lastMessages[0].getBody().array(), lastMsg.getBody().array());
+
+ System.out.println("Last Message ID = " + lastMsg.getMessageID());
+
+
+ assertEquals(0, buffers2.size());
+
+
+ }
+
+ protected ServerMessage createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
+ {
+ ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
+ System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
+
+ msg.setMessageID((long)messageId);
+
+ msg.setDestination(destination);
+ return msg;
+ }
+
+ protected ByteBuffer createRandomBuffer(int size)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+
+ for (int j = 0; j < buffer.limit(); j++)
+ {
+ buffer.put(RandomUtil.randomByte());
+ }
+ return buffer;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list