[jboss-cvs] JBoss Messaging SVN: r4677 - in trunk: src/main/org/jboss/messaging/core/journal and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 11 21:14:16 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-11 21:14:15 -0400 (Fri, 11 Jul 2008)
New Revision: 4677
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
Removed:
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/
trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:
Some journal work (tests, fixes and improvements)
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/config/jbm-configuration.xml 2008-07-12 01:14:15 UTC (rev 4677)
@@ -101,8 +101,9 @@
- closing Asynchronous files
- Transaction awaits
- Awaits on non transactional writes
+ This shouldn't be higher than the remoting timeout
-->
- <journal-aio-timeout>60000</journal-aio-timeout>
+ <journal-aio-timeout>4000</journal-aio-timeout>
<journal-task-period>5000</journal-task-period>
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -42,8 +42,10 @@
void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
- void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
-
+ void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
+
+ void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
+
void appendDeleteRecord(long id) throws Exception;
// Transactional operations
@@ -54,8 +56,10 @@
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
void appendDeleteRecordTransactional(long txID, long id) throws Exception;
void appendCommitRecord(long txID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -45,5 +45,7 @@
// Avoid using this method in production as it creates an unecessary copy
ByteBuffer wrapBuffer(byte[] bytes);
+
+ int getAlignment();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -62,5 +62,7 @@
int getMaxAIO();
- long getAIOTimeout();
+ long getAIOTimeout();
+
+ void forceMoveNextFile() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -238,6 +238,10 @@
}
}
+ public String toString()
+ {
+ return "AIOSequentialFile:" + this.journalDir + "/" + this.fileName;
+ }
// Private methods
// -----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -65,11 +65,16 @@
return ByteBuffer.allocateDirect(size);
}
+ public int getAlignment()
+ {
+ return 512;
+ }
+
// For tests only
public ByteBuffer wrapBuffer(final byte[] bytes)
{
ByteBuffer newbuffer = newBuffer(bytes.length);
newbuffer.put(bytes);
return newbuffer;
- };
}
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -173,13 +173,13 @@
private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
- private final ExecutorService closingExecutor = Executors.newSingleThreadExecutor();
+ private ExecutorService closingExecutor = null;
/**
* We have a separated executor for open, as if we used the same executor this would still represent
* a point of wait between the closing and open.
* */
- private final ExecutorService openExecutor = Executors.newSingleThreadExecutor();
+ private ExecutorService openExecutor = null;
/*
* We use a semaphore rather than synchronized since it performs better when
@@ -187,6 +187,7 @@
*/
//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
+ // this locks access to currentFile
private final Semaphore lock = new Semaphore(1, true);
private volatile JournalFile currentFile ;
@@ -314,37 +315,68 @@
posFilesMap.put(id, new PosFiles(usedFile));
}
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- PosFiles posFiles = posFilesMap.get(id);
-
- if (posFiles == null)
- {
- throw new IllegalStateException("Cannot find add info " + id);
- }
-
- int size = SIZE_UPDATE_RECORD + record.length;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(UPDATE_RECORD);
- bb.putLong(id);
- bb.put(recordType);
- bb.putInt(record.length);
- bb.put(record);
- bb.put(DONE);
- bb.rewind();
-
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ PosFiles posFiles = posFilesMap.get(id);
+
+ if (posFiles == null)
+ {
+ throw new IllegalStateException("Cannot find add info " + id);
+ }
+
+ int size = SIZE_UPDATE_RECORD + record.length;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(UPDATE_RECORD);
+ bb.putLong(id);
+ bb.put(recordType);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
+ bb.rewind();
+
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-
- posFiles.addUpdateFile(usedFile);
- }
-
+
+ posFiles.addUpdateFile(usedFile);
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ PosFiles posFiles = posFilesMap.get(id);
+
+ if (posFiles == null)
+ {
+ throw new IllegalStateException("Cannot find add info " + id);
+ }
+
+ int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+
+ bb.putByte(UPDATE_RECORD);
+ bb.putLong(id);
+ bb.putByte(recordType);
+ bb.putInt(record.getEncodeSize());
+ record.encode(bb);
+ bb.putByte(DONE);
+ bb.rewind();
+
+ JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+
+ posFiles.addUpdateFile(usedFile);
+ }
+
public void appendDeleteRecord(long id) throws Exception
{
if (state != STATE_LOADED)
@@ -359,8 +391,6 @@
throw new IllegalStateException("Cannot find add info " + id);
}
- posFiles.addDelete(currentFile);
-
int size = SIZE_DELETE_RECORD;
ByteBuffer bb = fileFactory.newBuffer(size);
@@ -370,7 +400,8 @@
bb.put(DONE);
bb.rewind();
- appendRecord(bb, syncNonTransactional, null);
+ JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+ posFiles.addDelete(usedFile);
}
public long getTransactionID()
@@ -439,35 +470,65 @@
tx.addPos(usedFile, id);
}
- public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_UPDATE_RECORD_TX + record.length;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(UPDATE_RECORD_TX);
- bb.putLong(txID);
- bb.put(recordType);
- bb.putLong(id);
- bb.putInt(record.length);
- bb.put(record);
- bb.put(DONE);
- bb.rewind();
-
+ public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_UPDATE_RECORD_TX + record.length;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(UPDATE_RECORD_TX);
+ bb.putLong(txID);
+ bb.put(recordType);
+ bb.putLong(id);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
+ bb.rewind();
+
JournalFile usedFile;
usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
+
TransactionNegPos tx = getTransactionInfo(txID);
- tx.addPos(usedFile, id);
- }
-
+ tx.addPos(usedFile, id);
+ }
+
+ public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+
+
+ bb.putByte(UPDATE_RECORD_TX);
+ bb.putLong(txID);
+ bb.putByte(recordType);
+ bb.putLong(id);
+ bb.putInt(record.getEncodeSize());
+ record.encode(bb);
+ bb.putByte(DONE);
+ bb.rewind();
+
+ JournalFile usedFile;
+
+ usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+
+ TransactionNegPos tx = getTransactionInfo(txID);
+
+ tx.addPos(usedFile, id);
+ }
+
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
{
if (state != STATE_LOADED)
@@ -647,7 +708,7 @@
for (JournalFile file: orderedFiles)
{
- file.getFile().open();//aki
+ file.getFile().open();
ByteBuffer bb = fileFactory.newBuffer(fileSize);
@@ -1112,6 +1173,9 @@
//Reverse the refs
transactionInfo.forget();
+
+ // Remove the transactionInfo
+ transactionInfos.remove(transaction.transactionID);
}
else
{
@@ -1132,7 +1196,7 @@
public int getAlignment() throws Exception
{
- return this.currentFile.getFile().getAlignment();
+ return this.fileFactory.getAlignment();
}
public synchronized void checkReclaimStatus() throws Exception
@@ -1158,12 +1222,25 @@
}
}
+ for (JournalFile file: freeFiles)
+ {
+ builder.append("FreeFile:" + file + "\n");
+ }
+
builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
- builder.append(((JournalFileImpl)currentFile).debug());
+
+ if (currentFile instanceof JournalFileImpl)
+ {
+ builder.append(((JournalFileImpl)currentFile).debug());
+ }
+
+ builder.append("#Opened Files:" + this.openedFiles.size());
return builder.toString();
}
+ // TestableJournal implementation --------------------------------------------------------------
+
/** Method for use on testcases.
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
@@ -1173,7 +1250,7 @@
callback.waitCompletion(aioTimeout);
}
- if (!closingExecutor.isShutdown())
+ if (closingExecutor != null && !closingExecutor.isShutdown())
{
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = new CountDownLatch(1);
@@ -1189,7 +1266,7 @@
latch.await();
}
- if (!openExecutor.isShutdown())
+ if (openExecutor != null && !openExecutor.isShutdown())
{
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = new CountDownLatch(1);
@@ -1207,8 +1284,6 @@
}
- // TestableJournal implementation --------------------------------------------------------------
-
public synchronized void checkAndReclaimFiles() throws Exception
{
checkReclaimStatus();
@@ -1219,20 +1294,23 @@
{
//File can be reclaimed or deleted
- if (trace) log.trace("Reclaiming file " + file);
+ if (trace) log.trace("Reclaiming file " + file);
+ log.info("Reclaiming file " + file); // remove this
dataFiles.remove(file);
//FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 < minFiles)
- {
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
//Re-initialise it
long newOrderingID = generateOrderingID();
SequentialFile sf = file.getFile();
- sf.open();
+ log.info("Adding " + sf + "to freeFiles"); // remove this
+
+ sf.open();
ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
@@ -1258,6 +1336,8 @@
}
else
{
+ log.info("Deleting " + file.getFile()); // remove this
+
file.getFile().open();
file.getFile().delete();
@@ -1330,7 +1410,24 @@
{
return aioTimeout;
}
-
+
+ // In some tests we need to force the journal to move to a next file
+ public void forceMoveNextFile() throws Exception
+ {
+ lock.acquire();
+
+ try
+ {
+ moveNextFile();
+ }
+ finally
+ {
+ lock.release();
+ }
+
+ debugWait();
+ }
+
// MessagingComponent implementation ---------------------------------------------------
public synchronized boolean isStarted()
@@ -1345,6 +1442,9 @@
throw new IllegalStateException("Journal is not stopped");
}
+ this.openExecutor = Executors.newSingleThreadExecutor();
+ this.closingExecutor = Executors.newSingleThreadExecutor();
+
state = STATE_STARTED;
}
@@ -1358,7 +1458,7 @@
stopReclaimer();
closingExecutor.shutdown();
- if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.SECONDS))
+ if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
{
throw new IllegalStateException("Time out waiting for closing executor to finish");
}
@@ -1369,12 +1469,11 @@
}
openExecutor.shutdown();
- if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.SECONDS))
+ if (!openExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
{
throw new IllegalStateException("Time out waiting for open executor to finish");
}
-
for (JournalFile file: openedFiles)
{
file.getFile().close();
@@ -1522,6 +1621,7 @@
return orderingID;
}
+ // You need to guarantee lock.acquire() over currentFile before calling this method
private void checkFile(final int size) throws Exception
{
if (size % currentFile.getFile().getAlignment() != 0)
@@ -1537,21 +1637,21 @@
if (currentFile == null || fileSize - currentFile.getOffset() < size)
{
- closeFile(currentFile);
+ moveNextFile();
- enqueueOpenFile();
-
- currentFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
-
- if (currentFile == null)
- {
- throw new IllegalStateException("Timed out waiting for an opened file");
- }
-
}
}
- private void enqueueOpenFile()
+ // You need to guarantee lock.acquire() before calling this method
+ private void moveNextFile() throws InterruptedException
+ {
+ closeFile(currentFile);
+
+ currentFile = enqueueOpenFile();
+ }
+
+ // You need to guarantee lock.acquire() before calling this method
+ private JournalFile enqueueOpenFile() throws InterruptedException
{
if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
openExecutor.execute(new Runnable()
@@ -1568,12 +1668,21 @@
}
}
});
+
+ JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
+
+ if (nextFile == null)
+ {
+ throw new IllegalStateException("Timed out waiting for an opened file");
+ }
+
+ return nextFile;
}
/**
*
- * Open a file an place it into the openedFiles queue
+ * Open a file and place it into the openedFiles queue
* */
private void pushOpenedFile() throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -63,5 +63,10 @@
return ByteBuffer.wrap(bytes);
}
+ public int getAlignment()
+ {
+ return 1;
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.Journal;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
@@ -199,7 +200,7 @@
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
- byte[] record = ackBytes(queueID, messageID);
+ EncodingSupport record = ackBytes(queueID, messageID);
messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, record);
}
@@ -218,7 +219,7 @@
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
{
- byte[] record = ackBytes(queueID, messageID);
+ EncodingSupport record = ackBytes(queueID, messageID);
messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, record);
}
@@ -601,17 +602,9 @@
// Private ----------------------------------------------------------------------------------
- private byte[] ackBytes(final long queueID, final long messageID)
+ private EncodingSupport ackBytes(final long queueID, final long messageID)
{
- byte[] record = new byte[SIZE_LONG + SIZE_LONG];
-
- ByteBuffer bb = ByteBuffer.wrap(record);
-
- bb.putLong(queueID);
-
- bb.putLong(messageID);
-
- return record;
+ return new ACKRecord(queueID, messageID);
}
private void checkAndCreateDir(String dir, boolean create)
@@ -643,5 +636,40 @@
log.info("Directory " + dir + " already exists");
}
}
+
+ // Inner Classes ----------------------------------------------------------------------------
+ class ACKRecord implements EncodingSupport
+ {
+ private long queueID;
+ private long messageID;
+
+
+
+ public ACKRecord(long queueID, long messageID)
+ {
+ super();
+ this.queueID = queueID;
+ this.messageID = messageID;
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putLong(queueID);
+ buffer.putLong(messageID);
+ }
+
+ public int getEncodeSize()
+ {
+ return SIZE_LONG * 2;
+ }
+
+ }
+
+
}
Deleted: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/ant/JUnitTestSuiteListener.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -1,119 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.tools.ant;
-
-import java.io.OutputStream;
-
-import junit.framework.AssertionFailedError;
-import junit.framework.Test;
-
-import org.apache.tools.ant.BuildException;
-import org.apache.tools.ant.taskdefs.optional.junit.JUnitResultFormatter;
-import org.apache.tools.ant.taskdefs.optional.junit.JUnitTest;
-
-/**
- * This class is a hack.
- *
- * I needed a way to intercept the end of a forked ant JUnit test run, in order to perform some
- * clean-up, and this is it: register this class as a JUnit batchtest formatter, and it will get
- * notified on a endTestSuite() event. Very important, it is run in the same address space as the
- * tests themselves.
- *
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class JUnitTestSuiteListener implements JUnitResultFormatter
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // JUnitResultFormatter implementation ---------------------------
-
- public void endTestSuite(JUnitTest suite) throws BuildException
- {
-// try
-// {
-// List destroyed = ServerManagement.destroySpawnedServers();
-// if (destroyed.size() > 0)
-// {
-// StringBuffer sb = new StringBuffer("Destroyed spawned test servers ");
-// for(Iterator i = destroyed.iterator(); i.hasNext();)
-// {
-// sb.append(i.next());
-// if (i.hasNext())
-// {
-// sb.append(',');
-// }
-// }
-// System.out.println(sb);
-// }
-// }
-// catch(Throwable t)
-// {
-// t.printStackTrace();
-// }
- }
-
- public void startTestSuite(JUnitTest suite) throws BuildException
- {
- // noop
- }
-
- public void setOutput(OutputStream out)
- {
- // noop
- }
-
- public void setSystemOutput(String out)
- {
- // noop
- }
-
- public void setSystemError(String err)
- {
- // noop
- }
-
- // TestListener implementation -----------------------------------
-
- public void addError(Test test, Throwable t)
- {
- // noop
- }
-
- public void addFailure(Test test, AssertionFailedError t)
- {
- // noop
- }
-
- public void endTest(Test test)
- {
- // noop
- }
-
- public void startTest(Test test)
- {
- // noop
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Property changes on: trunk/tests/src/org/jboss/messaging/tests
___________________________________________________________________
Name: svn:ignore
+ local
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -174,7 +174,7 @@
double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
- log.debug("Rate of " + rate + " adds/removes per sec");
+ log.info("Rate of " + rate + " adds/removes per sec");
log.debug("Reclaim status = " + debugJournal());
@@ -238,13 +238,13 @@
for (double rate: rates)
{
- log.debug("Transaction Rate = " + rate + " records/sec");
+ log.info("Transaction Rate = " + rate + " records/sec");
}
double rate = 1000 * (double)numMessages / (end - start);
- log.debug("Rate " + rate + " records/sec");
+ log.info("Rate " + rate + " records/sec");
}
finally
{
@@ -285,7 +285,7 @@
double rate = 1000 * (double)numMessages / (end - start);
- log.debug("Rate " + rate + " records/sec");
+ log.info("Rate " + rate + " records/sec");
journal.stop();
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -0,0 +1,462 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class AlignedJournalImplTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int alignment = 0;
+
+ private FakeSequentialFileFactory factory;
+
+ JournalImpl journalImpl = null;
+
+ private ArrayList<RecordInfo> records = null;
+
+ private ArrayList<PreparedTransactionInfo> transactions = null;
+
+
+ // Static --------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(AlignedJournalImplTest.class);
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // This test just validates basic alignment on the FakeSequentialFile itself
+ public void testBasicAlignment() throws Exception
+ {
+
+ FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200,
+ true, false);
+
+ SequentialFile file = factory.createSequentialFile("test1", 100, 10000);
+
+ file.open();
+
+
+
+ try
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(57);
+ file.write(buffer, true);
+ fail("Exception expected");
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ try
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(200);
+ for (int i = 0; i < 200; i++)
+ {
+ buffer.put(i, (byte) 1);
+ }
+
+ file.write(buffer, true);
+
+ buffer = ByteBuffer.allocate(400);
+ for (int i = 0; i < 400; i++)
+ {
+ buffer.put(i, (byte) 2);
+ }
+
+ file.write(buffer, true);
+
+ buffer = ByteBuffer.allocate(600);
+
+ file.position(0);
+
+ file.read(buffer);
+
+ for (int i = 0; i < 200; i++)
+ {
+ assertEquals((byte)1, buffer.get(i));
+ }
+
+ for (int i = 201; i < 600; i++)
+ {
+ assertEquals("Position " + i, (byte)2, buffer.get(i));
+ }
+
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ public void testAppendAndUpdateRecords() throws Exception
+ {
+
+ final int JOURNAL_SIZE = 51 * 1024;
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ for (int i = 0; i < 25; i++)
+ {
+ byte[] bytes = new byte[100];
+ for (int j=0; j<bytes.length; j++)
+ {
+ bytes[j] = (byte)i;
+ }
+ journalImpl.appendAddRecord(i * 100l, (byte)i, bytes);
+ }
+
+ for (int i = 25; i < 50; i++)
+ {
+ EncodingSupport support = new SimpleEncoding(100, (byte) i);
+ journalImpl.appendAddRecord(i * 100l, (byte)i, support);
+ }
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(50, records.size());
+
+ int i=0;
+ for (RecordInfo recordItem: records)
+ {
+ assertEquals(i * 100l, recordItem.id);
+ assertEquals(i, recordItem.getUserRecordType());
+ assertEquals(100, recordItem.data.length);
+ for (int j=0;j<100;j++)
+ {
+ assertEquals((byte)i, recordItem.data[j]);
+ }
+
+ i++;
+ }
+
+ for (i = 40; i < 50; i++)
+ {
+ byte[] bytes = new byte[10];
+ for (int j = 0; j < 10; j++)
+ {
+ bytes[j] = (byte)'x';
+ }
+
+ journalImpl.appendUpdateRecord(i * 100l, (byte)i, bytes);
+ }
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ i=0;
+ for (RecordInfo recordItem: records)
+ {
+
+ if (i < 50)
+ {
+ assertEquals(i * 100l, recordItem.id);
+ assertEquals(i, recordItem.getUserRecordType());
+ assertEquals(100, recordItem.data.length);
+ for (int j=0;j<100;j++)
+ {
+ assertEquals((byte)i, recordItem.data[j]);
+ }
+ }
+ else
+ {
+ assertEquals((i - 10) * 100l, recordItem.id);
+ assertEquals(i - 10, recordItem.getUserRecordType());
+ assertTrue(recordItem.isUpdate);
+ assertEquals(10, recordItem.data.length);
+ for (int j=0;j<10;j++)
+ {
+ assertEquals((byte)'x', recordItem.data[j]);
+ }
+ }
+
+ i++;
+ }
+
+ journalImpl.stop();
+
+ }
+
+ public void testAddAndDeleteReclaimWithoutTransactions() throws Exception
+ {
+ final int JOURNAL_SIZE = 51 * 1024;
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ journalImpl.checkAndReclaimFiles();
+
+ journalImpl.debugWait();
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+ log.debug("Initial:--> " + journalImpl.debug());
+
+ log.debug("_______________________________");
+
+ for (int i = 0; i < 50; i++)
+ {
+ journalImpl.appendAddRecord((long)i, (byte)1, new SimpleEncoding(200, (byte) 'x'));
+ }
+
+ // as the request to a new file is asynchronous, we need to make sure the async requests are done
+ journalImpl.debugWait();
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+ for (int i = 0; i < 50; i++)
+ {
+ journalImpl.appendDeleteRecord((long)i);
+ }
+
+ journalImpl.appendAddRecord((long)1000, (byte)1, new SimpleEncoding(200, (byte) 'x'));
+
+ journalImpl.debugWait();
+
+ assertEquals(4, factory.listFiles("tt").size());
+
+ journalImpl.checkReclaimStatus();
+
+ log.debug(journalImpl.debug());
+
+ journalImpl.checkAndReclaimFiles();
+
+ log.debug(journalImpl.debug());
+
+ journalImpl.debugWait();
+
+ log.debug("Final:--> " + journalImpl.debug());
+
+ log.debug("_______________________________");
+
+ log.debug("Files size:" + factory.listFiles("tt").size());
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+ }
+
+ public void testReloadWithTransaction() throws Exception
+ {
+ final int JOURNAL_SIZE = 51 * 1024;
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(1,(byte) 1));
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ try
+ {
+ journalImpl.appendCommitRecord(1l);
+ // This was supposed to throw an exception, as the transaction was forgotten (interrupted by a reload).
+ fail("Supposed to throw exception");
+ }
+ catch (Exception e)
+ {
+ log.warn(e);
+ }
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ }
+
+ public void testReclaimWithInterruptedTransaction() throws Exception
+ {
+ final int JOURNAL_SIZE = 51 * 1024;
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(50,(byte) 1));
+ journalImpl.forceMoveNextFile();
+ }
+
+ journalImpl.debugWait();
+
+ //System.out.println("files = " + journalImpl.debug());
+
+ assertEquals(12, factory.listFiles("tt").size());
+
+ journalImpl.appendAddRecordTransactional(2, 1, (byte) 1, new SimpleEncoding(200,(byte) 1));
+
+ assertEquals(12, factory.listFiles("tt").size());
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ //System.out.println("Journal - " + journalImpl.debug());
+
+ try
+ {
+ journalImpl.appendCommitRecord(1l);
+ // This was supposed to throw an exception, as the transaction was forgotten (interrupted by a reload).
+ fail("Supposed to throw exception");
+ }
+ catch (Exception e)
+ {
+ log.debug("Expected exception " + e, e);
+ }
+
+ setupJournal(JOURNAL_SIZE, 1024);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ assertEquals(12, factory.listFiles("tt").size());
+
+ journalImpl.checkAndReclaimFiles();
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ records = new ArrayList<RecordInfo>();
+
+ transactions = new ArrayList<PreparedTransactionInfo>();
+
+ factory = null;
+
+ journalImpl = null;
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ // Private -------------------------------------------------------
+
+ private void setupJournal(final int journalSize, final int alignment) throws Exception
+ {
+ if (factory == null)
+ {
+ this.alignment = alignment;
+
+ factory = new FakeSequentialFileFactory(alignment,
+ true, false);
+ }
+
+ if (journalImpl != null)
+ {
+ journalImpl.stop();
+ }
+
+ journalImpl = new JournalImpl(journalSize, 2,
+ true, true,
+ factory, 1000,
+ "tt", "tt", 1000, 1000);
+
+ journalImpl.start();
+
+ records.clear();
+ transactions.clear();
+
+ journalImpl.load(records, transactions);
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+ private class SimpleEncoding implements EncodingSupport
+ {
+
+ private final int size;
+ private final byte bytetosend;
+
+ public SimpleEncoding(int size, byte bytetosend)
+ {
+ this.size = size;
+ this.bytetosend = bytetosend;
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ for (int i = 0; i < size; i++)
+ {
+ buffer.putByte(bytetosend);
+ }
+ }
+
+ public int getEncodeSize()
+ {
+ return size;
+ }
+
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -115,6 +115,7 @@
{
journal.debugWait();
journal.checkAndReclaimFiles();
+ journal.debugWait();
}
protected abstract SequentialFileFactory getFileFactory() throws Exception;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -99,6 +99,28 @@
stopJournal();
}
+ public void testRestartJournal() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ stopJournal();
+ startJournal();
+ load();
+ byte[] record = new byte[1000];
+ for (int i = 0; i < record.length; i++)
+ {
+ record[i] = (byte)'a';
+ }
+ // Appending records after restart should be valid (not throwing any exceptions)
+ for (int i = 0; i < 100; i++)
+ {
+ journal.appendAddRecord(1, (byte)1, new byte[] {(byte)'a', (byte)'a'});
+ }
+ stopJournal();
+ }
+
public void testParams() throws Exception
{
try
@@ -609,12 +631,12 @@
//10
assertEquals(journal.getAlignment()==1?6:7, journal.getDataFilesCount());
- assertEquals(journal.getAlignment()==1?3:2, journal.getFreeFilesCount());
+ assertEquals(journal.getAlignment()==1?2:1, journal.getFreeFilesCount());
assertEquals(initialNumberOfAddRecords /2 + 10, journal.getIDMapSize());
List<String> files5 = fileFactory.listFiles(fileExtension);
- assertEquals(11, files5.size());
+ assertEquals(10, files5.size());
assertEquals(1, journal.getOpenedFilesCount());
//Now delete the rest
@@ -634,13 +656,13 @@
checkAndReclaimFiles();
- assertEquals(0, journal.getDataFilesCount());
- assertEquals(9, journal.getFreeFilesCount());
+ assertEquals(journal.getAlignment()==1?0:1, journal.getDataFilesCount());
+ assertEquals(journal.getAlignment()==1?8:7, journal.getFreeFilesCount());
assertEquals(0, journal.getIDMapSize());
List<String> files6 = fileFactory.listFiles(fileExtension);
- assertEquals(11, files6.size());
+ assertEquals(10, files6.size());
assertEquals(1, journal.getOpenedFilesCount());
stopJournal();
@@ -671,13 +693,13 @@
List<String> files2 = fileFactory.listFiles(fileExtension);
// 1 file for nextOpenedFile
- assertEquals(3, files2.size());
+ assertEquals(4, files2.size());
assertEquals(1, journal.getOpenedFilesCount());
//1 gets deleted and 1 gets reclaimed
- assertEquals(0, journal.getDataFilesCount());
- assertEquals(1, journal.getFreeFilesCount());
+ assertEquals(2, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
assertEquals(0, journal.getIDMapSize());
stopJournal();
@@ -911,13 +933,13 @@
//Most Should now be reclaimed - leaving 10 left in total
assertEquals(journal.getAlignment()==1?1:2, journal.getDataFilesCount());
- assertEquals(journal.getAlignment()==1?8:7, journal.getFreeFilesCount());
+ assertEquals(journal.getAlignment()==1?7:6, journal.getFreeFilesCount());
assertEquals(10, journal.getIDMapSize());
List<String> files10 = fileFactory.listFiles(fileExtension);
// The journal will aways keep one file opened (even if there are no more files on freeFiles)
- assertEquals(11, files10.size());
+ assertEquals(10, files10.size());
assertEquals(1, journal.getOpenedFilesCount());
}
@@ -1531,14 +1553,15 @@
checkAndReclaimFiles();
+
List<String> files6 = fileFactory.listFiles(fileExtension);
// files 0 and 1 should be deleted
- assertEquals(3, files6.size());
+ assertEquals(journal.getAlignment()==1?2:3, files6.size());
- assertEquals(0, journal.getDataFilesCount());
- assertEquals(1, journal.getFreeFilesCount());
+ assertEquals(journal.getAlignment()==1?0:1, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getOpenedFilesCount());
assertEquals(1, journal.getIDMapSize());
@@ -1551,10 +1574,10 @@
List<String> files7 = fileFactory.listFiles(fileExtension);
- assertEquals(3, files7.size());
+ assertEquals(journal.getAlignment()==1?2:3, files7.size());
- assertEquals(0, journal.getDataFilesCount());
- assertEquals(1, journal.getFreeFilesCount());
+ assertEquals(journal.getAlignment()==1?0:1, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getOpenedFilesCount());
assertEquals(1, journal.getIDMapSize());
}
@@ -1688,7 +1711,7 @@
public void testPrepareReclaim() throws Exception
{
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true);
+ setup(2, 100 * 1024, true);
createJournal();
startJournal();
load();
@@ -1714,17 +1737,21 @@
assertEquals(0, journal.getIDMapSize());
//Make sure we move on to the next file
-
+
+ journal.forceMoveNextFile();
+
+ journal.debugWait();
+
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
- assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength) , journal.getDataFilesCount());
+ assertEquals(3 , files2.size());
- assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
+ assertEquals(1, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getIDMapSize());
-
+
prepare(1); // in file 1
List<String> files3 = fileFactory.listFiles(fileExtension);
@@ -1740,23 +1767,19 @@
List<String> files4 = fileFactory.listFiles(fileExtension);
- assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(),
- 2, recordLength,
- 1, JournalImpl.SIZE_PREPARE_RECORD,
- 1, JournalImpl.SIZE_DELETE_RECORD) + 2 , files4.size());
+ assertEquals(3, files4.size());
assertEquals(1, journal.getOpenedFilesCount());
- assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(),
- 2, recordLength,
- 1, JournalImpl.SIZE_PREPARE_RECORD,
- 1, JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ assertEquals(1, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(0, journal.getIDMapSize());
//Move on to another file
- addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,3); // in file 2
+ journal.forceMoveNextFile();
+
+ addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,3); // in file 2
checkAndReclaimFiles();
@@ -1779,6 +1802,8 @@
assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getIDMapSize());
assertEquals(1, journal.getOpenedFilesCount());
+
+ journal.forceMoveNextFile();
addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,4); // in file 3
@@ -1806,9 +1831,9 @@
List<String> files9 = fileFactory.listFiles(fileExtension);
- assertEquals(journal.getAlignment()==1?5:6, files9.size());
+ assertEquals(5, files9.size());
- assertEquals(journal.getAlignment()==1?3:4, journal.getDataFilesCount());
+ assertEquals(3, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getOpenedFilesCount());
assertEquals(2, journal.getIDMapSize());
@@ -1817,19 +1842,21 @@
List<String> files10 = fileFactory.listFiles(fileExtension);
- assertEquals(journal.getAlignment()==1?5:4, files10.size());
+ assertEquals(journal.getAlignment()==1?5:5, files10.size());
- assertEquals(journal.getAlignment()==1?3:2, journal.getDataFilesCount());
+ assertEquals(journal.getAlignment()==1?3:3, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(2, journal.getIDMapSize());
+ journal.forceMoveNextFile();
+
addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD,5); // in file 4
List<String> files11 = fileFactory.listFiles(fileExtension);
- assertEquals(journal.getAlignment()==1?6:4, files11.size());
+ assertEquals(6, files11.size());
- assertEquals(journal.getAlignment()==1?4:2, journal.getDataFilesCount());
+ assertEquals(4, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getOpenedFilesCount());
assertEquals(3, journal.getIDMapSize());
@@ -1859,11 +1886,11 @@
List<String> files13 = fileFactory.listFiles(fileExtension);
- assertEquals(journal.getAlignment()==1?4:5, files13.size());
+ assertEquals(4, files13.size());
assertEquals(1, journal.getOpenedFilesCount());
- assertEquals(journal.getAlignment()==1?2:3, journal.getDataFilesCount());
+ assertEquals(2, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(2, journal.getIDMapSize());
@@ -1873,10 +1900,10 @@
log.debug("Debug journal on testPrepareReclaim ->\n" + debugJournal());
- assertEquals(5, files14.size());
+ assertEquals(4, files14.size());
assertEquals(1, journal.getOpenedFilesCount());
- assertEquals(3, journal.getDataFilesCount());
+ assertEquals(2, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(3, journal.getIDMapSize());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -38,34 +38,75 @@
* A FakeSequentialFileFactory
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class FakeSequentialFileFactory implements SequentialFileFactory
{
private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
- private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
+ // Constants -----------------------------------------------------
+ // Attributes ----------------------------------------------------
+
+ private final Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
+
+ private final int alignment;
+
+ private final boolean supportsCallback;
+
+ private final boolean holdCallbacks;
+
+ private final List<Runnable> callbacksInHold;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public FakeSequentialFileFactory(final int alignment, final boolean supportsCallback, final boolean holdCallback)
+ {
+ this.alignment = alignment;
+ this.supportsCallback = supportsCallback;
+ this.holdCallbacks = holdCallback;
+ if (holdCallbacks)
+ {
+ callbacksInHold = new ArrayList<Runnable>();
+ }
+ else
+ {
+ callbacksInHold = null;
+ }
+ }
+
+ public FakeSequentialFileFactory()
+ {
+ this(1, false, false);
+ }
+
+
+
+ // Public --------------------------------------------------------
+
public SequentialFile createSequentialFile(final String fileName, final int maxAIO, final long timeout) throws Exception
{
FakeSequentialFile sf = fileMap.get(fileName);
if (sf == null)
{
- sf = new FakeSequentialFile(fileName);
+ sf = newSequentialFile(fileName);
fileMap.put(fileName, sf);
}
else
- {
- sf.data.position(0);
+ {
+ sf.getData().position(0);
//log.debug("positioning data to 0");
}
return sf;
}
-
+
public List<String> listFiles(final String extension)
{
List<String> files = new ArrayList<String>();
@@ -93,26 +134,72 @@
public boolean isSupportsCallbacks()
{
- return false;
+ return supportsCallback;
}
public ByteBuffer newBuffer(int size)
{
- return ByteBuffer.allocate(size);
+ if (size % alignment != 0)
+ {
+ size = (size / alignment + 1) * alignment;
+ }
+ return ByteBuffer.allocateDirect(size);
}
-
+
public ByteBuffer wrapBuffer(byte[] bytes)
{
return ByteBuffer.wrap(bytes);
}
+
+ public void flushAllCallbacks()
+ {
+ for (Runnable action : callbacksInHold)
+ {
+ action.run();
+ }
+
+ callbacksInHold.clear();
+ }
+ public void flushCallback(int position)
+ {
+ Runnable run = callbacksInHold.get(position);
+ run.run();
+ callbacksInHold.remove(run);
+ }
+
+ public int getNumberOfCallbacks()
+ {
+ return callbacksInHold.size();
+ }
+
+ public int getAlignment()
+ {
+ return alignment;
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected FakeSequentialFile newSequentialFile(final String fileName)
+ {
+ return new FakeSequentialFile(fileName);
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
public class FakeSequentialFile implements SequentialFile
{
private volatile boolean open;
private final String fileName;
- private volatile ByteBuffer data;
+ private ByteBuffer data;
public ByteBuffer getData()
{
@@ -161,7 +248,7 @@
open = true;
}
- public void fill(int pos, int size, byte fillCharacter) throws Exception
+ public void fill(final int pos, final int size, final byte fillCharacter) throws Exception
{
if (!open)
{
@@ -180,24 +267,20 @@
}
}
- public int read(ByteBuffer bytes) throws Exception
+ public int read(final ByteBuffer bytes) throws Exception
{
return read(bytes, null);
}
- public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
if (!open)
{
throw new IllegalStateException("Is closed");
}
- //log.debug("read called " + bytes.array().length);
+ byte[] bytesRead = new byte[bytes.limit()];
- byte[] bytesRead = new byte[bytes.array().length];
-
- //log.debug("reading, data pos is " + data.position() + " data size is " + data.array().length);
-
data.get(bytesRead);
bytes.put(bytesRead);
@@ -209,45 +292,65 @@
return bytesRead.length;
}
- public void position(int pos) throws Exception
+ public void position(final int pos) throws Exception
{
if (!open)
{
throw new IllegalStateException("Is closed");
}
- //log.debug("reset called");
+ checkAlignment(pos);
data.position(pos);
}
- public int write(ByteBuffer bytes, IOCallback callback) throws Exception
+ public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
if (!open)
{
throw new IllegalStateException("Is closed");
}
- int position = data == null ? 0 : data.position();
+ final int position = data == null ? 0 : data.position();
+ checkAlignment(position);
+
+ checkAlignment(bytes.limit());
+
checkAndResize(bytes.capacity() + position);
- //log.debug("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+ Runnable action = new Runnable()
+ {
+
+ public void run()
+ {
+
+ data.put(bytes);
+
+ if (callback!=null) callback.done();
+ }
+
+ };
- data.put(bytes);
+ if (holdCallbacks && callback != null)
+ {
+ FakeSequentialFileFactory.this.callbacksInHold.add(action);
+ }
+ else
+ {
+ action.run();
+ }
- if (callback!=null) callback.done();
+ return bytes.limit();
- return bytes.array().length;
-
}
- public int write(ByteBuffer bytes, boolean sync) throws Exception
+ public int write(final ByteBuffer bytes, final boolean sync) throws Exception
{
return write(bytes, null);
}
- private void checkAndResize(int size)
+ private void checkAndResize(final int size)
{
int oldpos = data == null ? 0 : data.position();
@@ -268,18 +371,28 @@
public int getAlignment() throws Exception
{
- return 1;
+ return alignment;
}
- public int calculateBlockStart(int position) throws Exception
+ public int calculateBlockStart(final int position) throws Exception
{
- return position;
+ int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+ return pos;
}
public String toString()
{
return "FakeSequentialFile:" + this.fileName;
}
+
+ private void checkAlignment (final int position)
+ {
+ if ( position % alignment != 0)
+ {
+ throw new IllegalStateException("Position is not aligned to " + alignment);
+ }
+ }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-11 15:26:24 UTC (rev 4676)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-12 01:14:15 UTC (rev 4677)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.Journal;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
@@ -96,7 +97,7 @@
bb.putLong(queueID);
bb.putLong(messageID);
- messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), EasyMock.aryEq(record));
+ messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));
EasyMock.replay(messageJournal, bindingsJournal);
jsm.storeAcknowledge(queueID, messageID);
EasyMock.verify(messageJournal, bindingsJournal);
@@ -147,7 +148,7 @@
bb.putLong(messageID);
final long txID = 12091921;
- messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), EasyMock.aryEq(record));
+ messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));
EasyMock.replay(messageJournal, bindingsJournal);
jsm.storeAcknowledgeTransactional(txID, queueID, messageID);
EasyMock.verify(messageJournal, bindingsJournal);
@@ -820,6 +821,53 @@
assertEquals(1, bindingsJournal.getAIOTimeout());
}
+ private EncodingSupport encodingMatch(final byte expectedRecord[])
+ {
+
+ EasyMock.reportMatcher(new IArgumentMatcher()
+ {
+
+ public void appendTo(StringBuffer buffer)
+ {
+ }
+
+ public boolean matches(Object argument)
+ {
+ EncodingSupport support = (EncodingSupport)argument;
+
+ if (support.getEncodeSize() != expectedRecord.length)
+ {
+ return false;
+ }
+
+ byte newByte[] = new byte[expectedRecord.length];
+
+ ByteBuffer buffer = ByteBuffer.wrap(newByte);
+
+ ByteBufferWrapper wrapper = new ByteBufferWrapper(buffer);
+
+ support.encode(wrapper);
+
+ byte encodingBytes[] = wrapper.array();
+
+ for (int i = 0; i < encodingBytes.length; i++)
+ {
+ if (encodingBytes[i] != expectedRecord[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ });
+
+ return null;
+ }
+
+
+
public static ServerMessage eqServerMessage(ServerMessage serverMessage)
{
EasyMock.reportMatcher(new ServerMessageMatcher(serverMessage));
More information about the jboss-cvs-commits
mailing list