[jboss-cvs] JBoss Messaging SVN: r4728 - in trunk: src/config and 20 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 24 13:09:07 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-24 13:09:06 -0400 (Thu, 24 Jul 2008)
New Revision: 4728
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java
Modified:
trunk/docs/userguide/en/modules/configuration.xml
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/util/VariableLatch.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/impl/AIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
trunk/tests/src/org/jboss/messaging/tests/timing/util/VariableLatchTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/util/VariableLatchTest.java
Log:
More Journal improvements: Removing timeouts by aways blocking on callbacks and few other tweaks such as configuration and updating documentation
Modified: trunk/docs/userguide/en/modules/configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/configuration.xml 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/docs/userguide/en/modules/configuration.xml 2008-07-24 17:09:06 UTC (rev 4728)
@@ -126,16 +126,6 @@
(parameter ignored on NIO) -->
<journal-max-aio>9000</journal-max-aio>
- <!-- Maximum time in milliseconds an AIO operation could take.
- This includes:
- - closing Asynchronous files
- - Transaction awaits
- - Awaits on non transactional writes
- -->
- <journal-aio-timeout>90000</journal-aio-timeout>
-
- <journal-task-period>5000</journal-task-period>
-
<security-enabled>true</security-enabled>
</configuration>
@@ -269,17 +259,6 @@
</para>
</listitem>
<listitem>
- <para>journal-aio-timeout</para>
- <para>Maximum amount of time any asynchronous operation can take in milliseconds. If any operation takes
- more than this amount a timeout exception will be logged. This parameter is only available on AIO which
- is only available on Linux.
- </para>
- </listitem>
- <listitem>
- <para>journal-task-period</para>
- <para>How frequently to reclaim unused journal data files, in milliseconds.</para>
- </listitem>
- <listitem>
<para>security-enabled</para>
<para>Whether security is enabled, if false no security checks are made.</para>
</listitem>
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/config/jbm-configuration.xml 2008-07-24 17:09:06 UTC (rev 4728)
@@ -95,16 +95,6 @@
-->
<journal-max-aio>5000</journal-max-aio>
-
- <!-- Maximum time in milliseconds an AIO operation could take.
- This includes:
- - closing Asynchronous files
- - Transaction awaits
- - Awaits on non transactional writes
- This shouldn't be higher than the remoting timeout
- -->
- <journal-aio-timeout>4000</journal-aio-timeout>
-
<journal-task-period>5000</journal-task-period>
</configuration>
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -39,7 +39,7 @@
* @param fileName
* @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
*/
- void open(String fileName, int maxIO, long timeout);
+ void open(String fileName, int maxIO);
/**
* Warning: This function will perform a synchronous IO, probably translating to a fstat call
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -119,7 +119,6 @@
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Lock writeLock = lock.writeLock();
private Semaphore writeSemaphore;
- private long timeout;
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -131,12 +130,11 @@
// AsynchronousFile implementation
// ------------------------------------------------------------------------------------
- public void open(final String fileName, final int maxIO, final long timeout)
+ public void open(final String fileName, final int maxIO)
{
try
{
writeLock.lock();
- this.timeout = timeout;
this.maxIO = maxIO;
writeSemaphore = new Semaphore(this.maxIO);
@@ -163,9 +161,10 @@
try
{
writeLock.lock();
- if (!writeSemaphore.tryAcquire(maxIO, timeout, TimeUnit.MILLISECONDS))
+
+ while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
- throw new IllegalStateException("Timeout!");
+ log.warn("Couldn't acquire lock after 60 seconds on AIO", new Exception ("Warning: Couldn't acquire lock after 60 seconds on AIO"));
}
writeSemaphore = null;
stopPoller(handler);
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -143,14 +143,6 @@
void setJournalMaxAIO(int maxAIO);
- long getJournalAIOTimeout();
-
- void setJournalAIOTimeout(long timeout);
-
- long getJournalTaskPeriod();
-
- void setJournalTaskPeriod(long period);
-
boolean isCreateBindingsDir();
void setCreateBindingsDir(boolean create);
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -91,12 +91,6 @@
public static final int DEFAULT_MAX_AIO = 5000;
- public static final long DEFAULT_AIO_TIMEOUT = 60000; // in ms
-
- public static final long DEFAULT_JOURNAL_TASK_PERIOD = 5000;
-
-
-
private static final long serialVersionUID = 4077088945050267843L;
@@ -139,10 +133,6 @@
protected int journalMaxAIO = DEFAULT_MAX_AIO;
- protected long journalAIOTimeout = DEFAULT_AIO_TIMEOUT;
-
- protected long journalTaskPeriod = DEFAULT_JOURNAL_TASK_PERIOD;
-
// remoting config
//TODO - do we really need this sever id??? I don't see why
@@ -401,16 +391,6 @@
this.journalMaxAIO = maxAIO;
}
- public long getJournalAIOTimeout()
- {
- return journalAIOTimeout;
- }
-
- public void setJournalAIOTimeout(long timeout)
- {
- this.journalAIOTimeout = timeout;
- }
-
public int getJournalMinFiles()
{
return journalMinFiles;
@@ -421,16 +401,6 @@
this.journalMinFiles = files;
}
- public long getJournalTaskPeriod()
- {
- return journalTaskPeriod;
- }
-
- public void setJournalTaskPeriod(long period)
- {
- this.journalTaskPeriod = period;
- }
-
public boolean isCreateBindingsDir()
{
return createBindingsDir;
@@ -491,12 +461,10 @@
cother.getBindingsDirectory().equals(this.getBindingsDirectory()) &&
cother.getConnectionParams().equals(this.getConnectionParams()) &&
cother.getHost().equals(this.getHost()) &&
- cother.getJournalAIOTimeout() == this.getJournalAIOTimeout() &&
cother.getJournalDirectory().equals(this.getJournalDirectory()) &&
cother.getJournalFileSize() == this.getJournalFileSize() &&
cother.getJournalMaxAIO() == this.getJournalMaxAIO() &&
cother.getJournalMinFiles() == this.getJournalMinFiles() &&
- cother.getJournalTaskPeriod() == this.getJournalTaskPeriod() &&
cother.getJournalType() == this.getJournalType() &&
cother.getKeyStorePassword() == null ?
this.getKeyStorePassword() == null : cother.getKeyStorePassword().equals(this.getKeyStorePassword()) &&
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -168,11 +168,7 @@
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles);
- journalTaskPeriod = getLong(e, "journal-task-period", journalTaskPeriod);
-
journalMaxAIO = getInteger(e, "journal-max-aio", journalMaxAIO);
-
- journalAIOTimeout = getLong(e, "journal-aio-timeout", journalAIOTimeout);
NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");
Modified: trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -36,14 +36,4 @@
void updateRecord(RecordInfo info);
void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
-
- /**
- *
- * This may happen in a rare situation where a transaction commit timed out on AIO,
- * And right after that a rollback was fired but the previous transaction was completed when the TransactionCallback was already forgotten.
- *
- * This is because libaio's forget method is not working, so we have to come up with this "hack"
- *
- * */
- void restart();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -35,7 +35,7 @@
*/
public interface SequentialFileFactory
{
- SequentialFile createSequentialFile(String fileName, int maxIO, long timeout) throws Exception;
+ SequentialFile createSequentialFile(String fileName, int maxIO) throws Exception;
List<String> listFiles(String extension) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -60,8 +60,6 @@
int getMaxAIO();
- long getAIOTimeout();
-
void forceMoveNextFile() throws Exception;
void disableAutoReclaiming();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -56,8 +56,6 @@
private final int maxIO;
- private final long timeout;
-
private AsynchronousFile aioFile;
private AtomicLong position = new AtomicLong(0);
@@ -66,12 +64,11 @@
// serious performance problems. Because of that we make all the writes on AIO using a single thread.
private ExecutorService executor;
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final long timeout) throws Exception
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
{
this.journalDir = journalDir;
this.fileName = fileName;
this.maxIO = maxIO;
- this.timeout = timeout;
}
public int getAlignment() throws Exception
@@ -95,17 +92,13 @@
checkOpened();
opened = false;
executor.shutdown();
- if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS))
+
+ while (!executor.awaitTermination(60, TimeUnit.SECONDS))
{
- try
- {
- aioFile.close();
- }
- catch (Exception ignored)
- {
- }
- throw new Exception("Timeout!");
+ log.warn("Executor on file " + this.fileName + " couldn't complete its tasks in 60 seconds.",
+ new Exception ("Warning: Executor on file " + this.fileName + " couldn't complete its tasks in 60 seconds.") );
}
+
aioFile.close();
aioFile = null;
}
@@ -174,7 +167,7 @@
opened = true;
executor = Executors.newSingleThreadExecutor();
aioFile = new AsynchronousFileImpl();
- aioFile.open(journalDir + "/" + fileName, maxIO, timeout);
+ aioFile.open(journalDir + "/" + fileName, maxIO);
position.set(0);
}
@@ -203,7 +196,7 @@
int bytesRead = read (bytes, waitCompletion);
- waitCompletion.waitLatch(timeout);
+ waitCompletion.waitLatch();
return bytesRead;
}
@@ -228,7 +221,7 @@
int bytesWritten = write(bytes, completion);
- completion.waitLatch(timeout);
+ completion.waitLatch();
return bytesWritten;
}
@@ -315,20 +308,14 @@
latch.countDown();
}
- public boolean waitLatch(long timeout) throws Exception
+ public void waitLatch() throws Exception
{
- if (latch.await(timeout, TimeUnit.MILLISECONDS))
- {
- if (errorMessage != null)
- {
- throw new MessagingException(errorCode, errorMessage);
- }
- return true;
- }
- else
- {
- return false;
- }
+ latch.await();
+ if (errorMessage != null)
+ {
+ throw new MessagingException(errorCode, errorMessage);
+ }
+ return;
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -41,9 +41,9 @@
super(journalDir);
}
- public SequentialFile createSequentialFile(final String fileName, final int maxIO, final long timeout) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO) throws Exception
{
- return new AIOSequentialFile(journalDir, fileName, maxIO, timeout);
+ return new AIOSequentialFile(journalDir, fileName, maxIO);
}
public boolean isSupportsCallbacks()
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.LoadManager;
@@ -148,9 +149,6 @@
// used for Asynchronous IO only (ignored on NIO).
private final int maxAIO;
- // used for Asynchronous IO only (ignored on NIO).
- private final long aioTimeout; // in ms
-
private final int fileSize;
private final int minFiles;
@@ -206,7 +204,7 @@
public JournalImpl(final int fileSize, final int minFiles,
final boolean syncTransactional, final boolean syncNonTransactional,
final SequentialFileFactory fileFactory,
- final String filePrefix, final String fileExtension, final int maxAIO, final long aioTimeout)
+ final String filePrefix, final String fileExtension, final int maxAIO)
{
if (fileSize < MIN_FILE_SIZE)
{
@@ -232,10 +230,6 @@
{
throw new IllegalStateException("maxAIO should aways be a positive number");
}
- if (aioTimeout < 1)
- {
- throw new IllegalStateException("aio-timeout cannot be less than 1 second");
- }
this.fileSize = fileSize;
@@ -252,8 +246,6 @@
this.fileExtension = fileExtension;
this.maxAIO = maxAIO;
-
- this.aioTimeout = aioTimeout;
}
// Journal implementation ----------------------------------------------------------------
@@ -575,7 +567,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.get(txID);
+ JournalTransaction tx = transactionInfos.remove(txID);
if (tx == null)
{
@@ -584,7 +576,6 @@
JournalFile usedFile = writeTransaction(COMMIT_RECORD, txID, tx);
- transactionInfos.remove(txID);
transactionCallbacks.remove(txID);
tx.commit(usedFile);
@@ -598,7 +589,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.get(txID);
+ JournalTransaction tx = transactionInfos.remove(txID);
if (tx == null)
{
@@ -617,7 +608,6 @@
JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
- transactionInfos.remove(txID);
transactionCallbacks.remove(txID);
tx.rollback(usedFile);
@@ -654,12 +644,6 @@
{
recordsToDelete.add(id);
}
-
- public void restart()
- {
- recordsToDelete.clear();
- records.clear();
- }
});
@@ -683,440 +667,393 @@
throw new IllegalStateException("Journal must be in started state");
}
- boolean fileConsistent = true;
+ Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
- Map<Long, TransactionHolder> transactions = null;
+ List<JournalFile> orderedFiles = orderFiles();
int lastDataPos = -1;
-
- long maxMessageID = -1;
-
+
long maxTransactionID = -1;
- HashSet<Long> commitsToForget = new HashSet<Long>();
- HashSet<Long> performedCommits = new HashSet<Long>();
-
- do
- {
-
- if (!fileConsistent)
- {
- loadManager.restart();
- }
+ long maxMessageID = -1;
+
+ for (JournalFile file: orderedFiles)
+ {
+ file.getFile().open();
- fileConsistent = true;
+ ByteBuffer bb = fileFactory.newBuffer(fileSize);
- performedCommits.clear();
+ int bytesRead = file.getFile().read(bb);
- dataFiles.clear();
- freeFiles.clear();
- currentFile = null;
+ if (bytesRead != fileSize)
+ {
+ //deal with this better
+
+ throw new IllegalStateException("File is wrong size " + bytesRead +
+ " expected " + fileSize + " : " + file.getFile().getFileName());
+ }
- transactions = new LinkedHashMap<Long, TransactionHolder>();
+ //First long is the ordering timestamp, we just jump its position
+ bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
- List<JournalFile> orderedFiles = orderFiles();
+ boolean hasData = false;
- lastDataPos = -1;
-
- maxTransactionID = -1;
-
- maxMessageID = -1;
-
- for (JournalFile file: orderedFiles)
- {
- file.getFile().open();
+ while (bb.hasRemaining())
+ {
+ final int pos = bb.position();
- ByteBuffer bb = fileFactory.newBuffer(fileSize);
+ byte recordType = bb.get();
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ {
+ if (trace)
+ {
+ log.trace("Invalid record type at " + bb.position() + " file:" + file);
+ }
+ continue;
+ }
+
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ continue;
+ }
+
+ int readFileId = bb.getInt();
- int bytesRead = file.getFile().read(bb);
-
- if (bytesRead != fileSize)
+ if (readFileId != file.getOrderingID())
{
- //deal with this better
-
- throw new IllegalStateException("File is wrong size " + bytesRead +
- " expected " + fileSize + " : " + file.getFile().getFileName());
+ // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+ hasData = true;
+
+ bb.position(pos + 1);
+ //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
+ continue;
}
- //First long is the ordering timestamp, we just jump its position
- bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
+ long transactionID = 0;
- boolean hasData = false;
+ if (isTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+ transactionID = bb.getLong();
+ maxTransactionID = Math.max(maxTransactionID, transactionID);
+ }
- while (bb.hasRemaining())
+ long recordID = 0;
+ if (!isCompleteTransaction(recordType))
{
- final int pos = bb.position();
-
- byte recordType = bb.get();
- if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ if (bb.position() + SIZE_LONG > fileSize)
{
- if (trace)
- {
- log.trace("Invalid record type at " + bb.position() + " file:" + file);
- }
continue;
}
-
+ recordID = bb.getLong();
+ maxMessageID = Math.max(maxMessageID, recordID);
+ }
+
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
+ byte userRecordType = 0;
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
+ {
if (bb.position() + SIZE_INT > fileSize)
{
continue;
}
-
- int readFileId = bb.getInt();
- if (readFileId != file.getOrderingID())
+ variableSize = bb.getInt();
+
+ if (bb.position() + variableSize > fileSize)
{
- // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
- hasData = true;
-
- bb.position(pos + 1);
- //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
continue;
}
- long transactionID = 0;
+ userRecordType = bb.get();
- if (isTransaction(recordType))
+ record = new byte[variableSize];
+ bb.get(record);
+ }
+
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ variableSize = bb.getInt() * SIZE_INT * 2;
+ }
+
+ int recordSize = getRecordSize(recordType);
+
+ if (pos + recordSize + variableSize > fileSize)
+ {
+ continue;
+ }
+
+ int oldPos = bb.position();
+
+ bb.position(pos + variableSize + recordSize - SIZE_INT);
+
+ int checkSize = bb.getInt();
+
+ if (checkSize != variableSize + recordSize)
+ {
+ log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+ // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+ hasData = true;
+ bb.position(pos + SIZE_BYTE);
+ continue;
+ }
+
+ bb.position(oldPos);
+
+ switch(recordType)
+ {
+ case ADD_RECORD:
+ {
+ loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+
+ posFilesMap.put(recordID, new PosFiles(file));
+
+ hasData = true;
+
+ break;
+ }
+ case UPDATE_RECORD:
{
- if (bb.position() + SIZE_LONG > fileSize)
+ loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ hasData = true;
+ file.incPosCount();
+
+ PosFiles posFiles = posFilesMap.get(recordID);
+
+ if (posFiles != null)
{
- continue;
+ //It's legal for this to be null. The file(s) with the may have been deleted
+ //just leaving some updates in this file
+
+ posFiles.addUpdateFile(file);
}
- transactionID = bb.getLong();
- maxTransactionID = Math.max(maxTransactionID, transactionID);
- }
-
- long recordID = 0;
- if (!isCompleteTransaction(recordType))
+
+ break;
+ }
+ case DELETE_RECORD:
{
- if (bb.position() + SIZE_LONG > fileSize)
+ loadManager.deleteRecord(recordID);
+ hasData = true;
+
+ PosFiles posFiles = posFilesMap.remove(recordID);
+
+ if (posFiles != null)
{
- continue;
+ posFiles.addDelete(file);
+ }
+
+ break;
+ }
+ case ADD_RECORD_TX:
+ case UPDATE_RECORD_TX:
+ {
+ TransactionHolder tx = transactions.get(transactionID);
+
+ if (tx == null)
+ {
+ tx = new TransactionHolder(transactionID);
+ transactions.put(transactionID, tx);
}
- recordID = bb.getLong();
- maxMessageID = Math.max(maxMessageID, recordID);
- }
-
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
- byte userRecordType = 0;
- byte record[] = null;
-
- if (isContainsBody(recordType))
- {
- if (bb.position() + SIZE_INT > fileSize)
+
+ tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));
+
+ JournalTransaction tnp = transactionInfos.get(transactionID);
+
+ if (tnp == null)
{
- continue;
+ tnp = new JournalTransaction();
+
+ transactionInfos.put(transactionID, tnp);
}
- variableSize = bb.getInt();
+ tnp.addPositive(file, recordID);
- if (bb.position() + variableSize > fileSize)
+ hasData = true;
+
+ break;
+ }
+ case DELETE_RECORD_TX:
+ {
+ TransactionHolder tx = transactions.get(transactionID);
+
+ if (tx == null)
{
- continue;
+ tx = new TransactionHolder(transactionID);
+ transactions.put(transactionID, tx);
}
- userRecordType = bb.get();
+ tx.recordsToDelete.add(recordID);
- record = new byte[variableSize];
- bb.get(record);
- }
-
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
- {
- variableSize = bb.getInt() * SIZE_INT * 2;
- }
-
- int recordSize = getRecordSize(recordType);
-
- if (pos + recordSize + variableSize > fileSize)
- {
- continue;
- }
-
- int oldPos = bb.position();
-
- bb.position(pos + variableSize + recordSize - SIZE_INT);
-
- int checkSize = bb.getInt();
-
- if (checkSize != variableSize + recordSize)
- {
- log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
- // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
- hasData = true;
- bb.position(pos + SIZE_BYTE);
- continue;
- }
-
- bb.position(oldPos);
-
- switch(recordType)
- {
- case ADD_RECORD:
- {
- loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
-
- posFilesMap.put(recordID, new PosFiles(file));
-
- hasData = true;
-
- break;
- }
- case UPDATE_RECORD:
+ JournalTransaction tnp = transactionInfos.get(transactionID);
+
+ if (tnp == null)
{
- loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
- hasData = true;
- file.incPosCount();
+ tnp = new JournalTransaction();
- PosFiles posFiles = posFilesMap.get(recordID);
-
- if (posFiles != null)
- {
- //It's legal for this to be null. The file(s) with the may have been deleted
- //just leaving some updates in this file
-
- posFiles.addUpdateFile(file);
- }
-
- break;
- }
- case DELETE_RECORD:
+ transactionInfos.put(transactionID, tnp);
+ }
+
+ tnp.addNegative(file, recordID);
+
+ hasData = true;
+
+ break;
+ }
+ case PREPARE_RECORD:
+ {
+ TransactionHolder tx = transactions.get(transactionID);
+
+ // We need to read it even if transaction was not found, or the reading checks would fail
+ // Pair <OrderId, NumberOfElements>
+ Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+
+ if (tx != null)
{
- loadManager.deleteRecord(recordID);
- hasData = true;
- PosFiles posFiles = posFilesMap.remove(recordID);
+ tx.prepared = true;
- if (posFiles != null)
- {
- posFiles.addDelete(file);
- }
+ JournalTransaction journalTransaction = transactionInfos.get(transactionID);
- break;
- }
- case ADD_RECORD_TX:
- case UPDATE_RECORD_TX:
- {
- TransactionHolder tx = transactions.get(transactionID);
-
- if (tx == null)
+ if (journalTransaction == null)
{
- tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
- tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));
- JournalTransaction tnp = transactionInfos.get(transactionID);
+ boolean healthy = checkTransactionHealth(
+ journalTransaction, orderedFiles, values);
- if (tnp == null)
+ if (healthy)
{
- tnp = new JournalTransaction();
-
- transactionInfos.put(transactionID, tnp);
+ journalTransaction.prepare(file);
}
-
- tnp.addPositive(file, recordID);
-
- hasData = true;
-
- break;
- }
- case DELETE_RECORD_TX:
- {
- TransactionHolder tx = transactions.get(transactionID);
-
- if (tx == null)
+ else
{
- tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+ journalTransaction.setInvalid(true);
+ tx.invalid = true;
}
- tx.recordsToDelete.add(recordID);
+ hasData = true;
+ }
+
+ break;
+ }
+ case COMMIT_RECORD:
+ {
+ TransactionHolder tx = transactions.remove(transactionID);
+
+ // We need to read it even if transaction was not found, or the reading checks would fail
+ // Pair <OrderId, NumberOfElements>
+ Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+
+ if (tx != null)
+ {
- JournalTransaction tnp = transactionInfos.get(transactionID);
+ JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
- if (tnp == null)
+ if (journalTransaction == null)
{
- tnp = new JournalTransaction();
-
- transactionInfos.put(transactionID, tnp);
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
+
+ boolean healthy = checkTransactionHealth(
+ journalTransaction, orderedFiles, values);
- tnp.addNegative(file, recordID);
- hasData = true;
-
- break;
- }
- case PREPARE_RECORD:
- {
- TransactionHolder tx = transactions.get(transactionID);
-
- // We need to read it even if transaction was not found, or the reading checks would fail
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
-
- if (tx != null)
+ if (healthy)
{
-
- tx.prepared = true;
-
- JournalTransaction journalTransaction = transactionInfos.get(transactionID);
-
- if (journalTransaction == null)
+ for (RecordInfo txRecord: tx.recordInfos)
{
- throw new IllegalStateException("Cannot find tx " + transactionID);
- }
-
-
- boolean healthy = checkTransactionHealth(
- journalTransaction, orderedFiles, values);
-
- if (healthy)
- {
- journalTransaction.prepare(file);
- }
- else
- {
- log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
- journalTransaction.setInvalid(true);
- tx.invalid = true;
- }
-
- hasData = true;
- }
-
- break;
- }
- case COMMIT_RECORD:
- {
- TransactionHolder tx = transactions.remove(transactionID);
-
- // We need to read it even if transaction was not found, or the reading checks would fail
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
-
- if (tx != null)
- {
-
- JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
-
- if (journalTransaction == null)
- {
- throw new IllegalStateException("Cannot find tx " + transactionID);
- }
-
- boolean healthy = checkTransactionHealth(
- journalTransaction, orderedFiles, values);
-
-
- if (commitsToForget.contains(transactionID))
- {
- log.warn("Transaction being ignored because of a post rollback");
- journalTransaction.forget();
- }
- else
- if (healthy)
- {
- performedCommits.add(transactionID);
-
- for (RecordInfo txRecord: tx.recordInfos)
+ if (txRecord.isUpdate)
{
- if (txRecord.isUpdate)
- {
- loadManager.updateRecord(txRecord);
- }
- else
- {
- loadManager.addRecord(txRecord);
- }
+ loadManager.updateRecord(txRecord);
}
-
- for (Long deleteValue: tx.recordsToDelete)
+ else
{
- loadManager.deleteRecord(deleteValue);
+ loadManager.addRecord(txRecord);
}
- journalTransaction.commit(file);
}
- else
+
+ for (Long deleteValue: tx.recordsToDelete)
{
- log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
- journalTransaction.forget();
+ loadManager.deleteRecord(deleteValue);
}
-
- hasData = true;
+ journalTransaction.commit(file);
}
+ else
+ {
+ log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
+ journalTransaction.forget();
+ }
- break;
+ hasData = true;
}
- case ROLLBACK_RECORD:
- {
- TransactionHolder tx = transactions.remove(transactionID);
+
+ break;
+ }
+ case ROLLBACK_RECORD:
+ {
+ TransactionHolder tx = transactions.remove(transactionID);
+
+ if (tx != null)
+ {
+ JournalTransaction tnp = transactionInfos.remove(transactionID);
- if (performedCommits.contains(transactionID) && !commitsToForget.contains(transactionID))
+ if (tnp == null)
{
- log.warn("Transaction " + transactionID + " was rolled back after its commit! Reload will need to be restarted with that transaction being ignored");
- commitsToForget.add(transactionID);
- fileConsistent = false;
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
+ tnp.rollback(file);
- if (tx != null)
- {
- JournalTransaction tnp = transactionInfos.remove(transactionID);
-
- if (tnp == null)
- {
- throw new IllegalStateException("Cannot find tx " + transactionID);
- }
-
- tnp.rollback(file);
-
- hasData = true;
- }
-
- break;
+ hasData = true;
}
- default:
- {
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " + recordType);
- }
+
+ break;
}
-
- checkSize = bb.getInt();
-
- if (checkSize != variableSize + recordSize)
+ default:
{
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " + recordType);
}
-
- bb.position(file.getFile().calculateBlockStart(bb.position()));
-
- if (recordType != FILL_CHARACTER)
- {
- lastDataPos = bb.position();
- }
}
- file.getFile().close();
+ checkSize = bb.getInt();
- if (hasData)
- {
- dataFiles.add(file);
+ if (checkSize != variableSize + recordSize)
+ {
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
}
- else
- {
- //Empty dataFiles with no data
- freeFiles.add(file);
- }
+
+ bb.position(file.getFile().calculateBlockStart(bb.position()));
+
+ if (recordType != FILL_CHARACTER)
+ {
+ lastDataPos = bb.position();
+ }
}
- transactionIDSequence.set(maxTransactionID + 1);
- }
- while (!fileConsistent);
+
+ file.getFile().close();
+
+ if (hasData)
+ {
+ dataFiles.add(file);
+ }
+ else
+ {
+ //Empty dataFiles with no data
+ freeFiles.add(file);
+ }
+ }
+ transactionIDSequence.set(maxTransactionID + 1);
//Create any more files we need
@@ -1258,7 +1195,7 @@
{
for (TransactionCallback callback: transactionCallbacks.values())
{
- callback.waitCompletion(aioTimeout);
+ callback.waitCompletion();
}
if (filesExecutor != null && !filesExecutor.isShutdown())
@@ -1371,11 +1308,6 @@
return maxAIO;
}
- public long getAIOTimeout()
- {
- return aioTimeout;
- }
-
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
@@ -1432,9 +1364,9 @@
}
filesExecutor.shutdown();
- if (!filesExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
+ while (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
- throw new IllegalStateException("Time out waiting for open executor to finish");
+ log.warn("Couldn't stop Journal after 60 seconds", new Exception ("Warning: Couldn't stop journal after 60 Seconds"));
}
for (JournalFile file: openedFiles)
@@ -1624,7 +1556,7 @@
for (String fileName: fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
file.open();
@@ -1671,6 +1603,10 @@
{
checkFile(size);
bb.position(SIZE_BYTE);
+ if (currentFile == null)
+ {
+ throw new Exception ("Current file = null");
+ }
bb.putInt(currentFile.getOrderingID());
bb.rewind();
if (callback != null)
@@ -1678,7 +1614,7 @@
currentFile.getFile().write(bb, callback);
if (sync)
{
- callback.waitCompletion(aioTimeout);
+ callback.waitCompletion();
}
}
else
@@ -1702,7 +1638,7 @@
if (trace) log.trace("Creating file " + fileName);
- SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO);
sequentialFile.open();
@@ -1804,13 +1740,18 @@
});
}
- JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
+ JournalFile nextFile = null;
- if (nextFile == null)
+ while (nextFile == null)
{
- throw new IllegalStateException("Timed out waiting for an opened file");
+ nextFile = openedFiles.poll(60, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ log.warn("Couldn't open a file in 60 Seconds", new Exception ("Warning: Couldn't open a file in 60 Seconds"));
+ }
}
+
return nextFile;
}
@@ -1874,7 +1815,7 @@
return tx;
}
- private TransactionCallback getTransactionCallback(final long transactionId)
+ private TransactionCallback getTransactionCallback(final long transactionId) throws MessagingException
{
if (fileFactory.isSupportsCallbacks() && syncTransactional)
{
@@ -1886,6 +1827,11 @@
transactionCallbacks.put(transactionId, callback);
}
+ if (callback.errorMessage != null)
+ {
+ throw new MessagingException(callback.errorCode, callback.errorMessage);
+ }
+
callback.countUp();
return callback;
}
@@ -1915,9 +1861,9 @@
countLatch.down();
}
- public void waitCompletion(long timeout) throws InterruptedException
+ public void waitCompletion() throws InterruptedException
{
- countLatch.waitCompletion(timeout);
+ countLatch.waitCompletion();
if (errorMessage != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -42,8 +42,8 @@
super(journalDir);
}
- // The timeout is ignored on NIO
- public SequentialFile createSequentialFile(final String fileName, final int maxIO, final long timeout)
+ // maxIO is ignored on NIO
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
return new NIOSequentialFile(journalDir, fileName);
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -128,7 +128,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, bindingsFF, "jbm-bindings", "bindings", 1, 1);
+ bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, bindingsFF, "jbm-bindings", "bindings", 1);
String journalDir = config.getJournalDirectory();
@@ -171,7 +171,7 @@
messageJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(), config.isJournalSyncTransactional(),
config.isJournalSyncNonTransactional(), journalFF,
- "jbm-data", "jbm", config.getJournalMaxAIO(), config.getJournalAIOTimeout());
+ "jbm-data", "jbm", config.getJournalMaxAIO());
}
/* This constructor is only used for testing */
Modified: trunk/src/main/org/jboss/messaging/util/VariableLatch.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/VariableLatch.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/src/main/org/jboss/messaging/util/VariableLatch.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -118,11 +118,8 @@
control.acquireSharedInterruptibly(1);
}
- public void waitCompletion(final long milliseconds) throws InterruptedException
+ public boolean waitCompletion(final long milliseconds) throws InterruptedException
{
- if (!control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds)))
- {
- throw new IllegalStateException("Timeout!");
- }
+ return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -84,7 +84,7 @@
{
log.debug(sync?"Sync test:":"Async test");
AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
- jlibAIO.open(FILE_NAME, 21000, 120);
+ jlibAIO.open(FILE_NAME, 21000);
try
{
log.debug("Preallocating file");
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -74,7 +74,7 @@
final AsynchronousFileImpl controller = new AsynchronousFileImpl();
for (int i = 0; i < 1000; i++)
{
- controller.open(FILE_NAME, 10000, 1200);
+ controller.open(FILE_NAME, 10000);
controller.close();
}
@@ -89,8 +89,8 @@
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl();
final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
- controller.open(FILE_NAME + ".1", 10000, 1200);
- controller2.open(FILE_NAME + ".2", 10000, 1200);
+ controller.open(FILE_NAME + ".1", 10000);
+ controller2.open(FILE_NAME + ".2", 10000);
int numberOfLines = 1000;
int size = 1024;
@@ -222,12 +222,12 @@
final int SIZE = 512;
- controller.open(FILE_NAME, 10, 1200);
+ controller.open(FILE_NAME, 10);
controller.close();
controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 10, 1200);
+ controller.open(FILE_NAME, 10);
controller.fill(0, 1, 512, (byte) 'j');
@@ -312,7 +312,7 @@
final int NUMBER_LINES = 5000;
final int SIZE = 1024;
- controller.open(FILE_NAME, 1000, 1200);
+ controller.open(FILE_NAME, 1000);
log.debug("Filling file");
@@ -347,7 +347,7 @@
log.debug("Closing file");
controller.close();
log.debug("Reading file");
- controller.open(FILE_NAME, 10, 1200);
+ controller.open(FILE_NAME, 10);
ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
@@ -414,7 +414,7 @@
CountDownLatch readLatch = new CountDownLatch(NUMBER_LINES);
final int SIZE = 1024;
- controller.open(FILE_NAME, 10000, 1200);
+ controller.open(FILE_NAME, 10000);
log.debug("Filling file");
@@ -447,7 +447,7 @@
assertEquals(0, readLatch.getCount());
readLatch.await();
log.debug("Reading file");
- controller.open(FILE_NAME, 10, 1200);
+ controller.open(FILE_NAME, 10);
ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
@@ -503,7 +503,7 @@
throws Exception
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, aioLimit, 1200);
+ controller.open(FILE_NAME, aioLimit);
try
{
@@ -579,7 +579,7 @@
final int SIZE = 1024;
final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 2000, 1200);
+ controller.open(FILE_NAME, 2000);
ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
encodeBufer(block);
@@ -614,7 +614,7 @@
public void testInvalidWrite() throws Exception
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 2000, 120);
+ controller.open(FILE_NAME, 2000);
try
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/impl/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/impl/AIOSequentialFileFactoryTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/impl/AIOSequentialFileFactoryTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -79,7 +79,7 @@
public void testBuffer() throws Exception
{
- SequentialFile file = factory.createSequentialFile("filtetmp.log", 10, 120);
+ SequentialFile file = factory.createSequentialFile("filtetmp.log", 10);
file.open();
ByteBuffer buff = factory.newBuffer(10);
assertEquals(512, buff.limit());
@@ -137,7 +137,7 @@
final int NUMBER_OF_RECORDS = 10000;
- SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000, 12000);
+ SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
file.open();
file.fill(0, 512 * NUMBER_OF_RECORDS, (byte)'a');
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -201,7 +201,7 @@
{
Journal journal =
new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 120);
+ "jbm-data", "jbm", 5000);
journal.start();
@@ -263,7 +263,7 @@
Journal journal =
new JournalImpl(10 * 1024 * 1024, numFiles, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 120);
+ "jbm-data", "jbm", 5000);
journal.start();
@@ -289,7 +289,7 @@
journal =
new JournalImpl(10 * 1024 * 1024, numFiles, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 120);
+ "jbm-data", "jbm", 5000);
journal.start();
journal.load(new ArrayList<RecordInfo>(), null);
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -227,11 +227,6 @@
numberOfUpdates++;
}
-
- public void restart()
- {
- ex = new Exception ("Journal was restarted");
- }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -107,10 +107,6 @@
public void updateRecord(RecordInfo info)
{
}
-
- public void restart()
- {
- }
});
@@ -146,8 +142,7 @@
public static JournalImpl createJournal(String journalType, String journalDir)
{
JournalImpl journal = new JournalImpl(10485760, 2, true,
- false, getFactory(journalType, journalDir), "journaltst", "tst", 5000,
- 60000);
+ false, getFactory(journalType, journalDir), "journaltst", "tst", 5000);
return journal;
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -1,235 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.timing.core.journal.impl;
-
-import java.util.ArrayList;
-
-import org.jboss.messaging.core.journal.PreparedTransactionInfo;
-import org.jboss.messaging.core.journal.RecordInfo;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
-import org.jboss.messaging.tests.util.UnitTestCase;
-
-public class JournalAsyncTimeoutsTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private FakeSequentialFileFactory factory;
-
- JournalImpl journalImpl = null;
-
- private ArrayList<RecordInfo> records = null;
-
- private ArrayList<PreparedTransactionInfo> transactions = null;
-
- // Static --------------------------------------------------------
-
- private static final Logger log = Logger
- .getLogger(JournalAsyncTimeoutsTest.class);
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testAsynchronousCommit() throws Exception
- {
-// final int JOURNAL_SIZE = 20000;
-//
-// setupJournal(JOURNAL_SIZE, 100, 5);
-//
-// assertEquals(2, factory.listFiles("tt").size());
-//
-// assertEquals(0, records.size());
-// assertEquals(0, transactions.size());
-//
-// for (int i = 0; i < 10 ; i++)
-// {
-// journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
-// journalImpl.forceMoveNextFile();
-// }
-//
-//
-// for (int i = 10; i < 20 ; i++)
-// {
-// journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
-// journalImpl.forceMoveNextFile();
-// }
-//
-// journalImpl.forceMoveNextFile();
-//
-// journalImpl.appendCommitRecord(1l);
-//
- }
-
-
-
- public void testTransactionTimeoutOnCommit() throws Exception
- {
- final int JOURNAL_SIZE = 20000;
-
- setupJournal(JOURNAL_SIZE, 1, 5, 1000);
-
- assertEquals(5, factory.listFiles("tt").size());
-
- assertEquals(0, records.size());
- assertEquals(0, transactions.size());
-
- factory.setHoldCallbacks(true);
-
- for (int i = 0; i < 20; i++)
- {
- journalImpl.appendAddRecordTransactional(1l, (long) i, (byte) 0,
- new SimpleEncoding(1, (byte) 15));
- }
-
- try
- {
- journalImpl.appendCommitRecord(1l);
- fail ("Supposed to timeout");
- }
- catch (Exception e)
- {
- }
-
- factory.flushAllCallbacks();
-
- factory.setHoldCallbacks(false);
-
- journalImpl.appendRollbackRecord(1l);
-
- setupJournal(JOURNAL_SIZE, 1, 5, 1000);
-
- assertEquals(0, records.size());
- assertEquals(0, journalImpl.getDataFilesCount());
- }
-
- public void testTransactionTimeoutOnRollback() throws Exception
- {
- final int JOURNAL_SIZE = 20000;
-
- setupJournal(JOURNAL_SIZE, 1, 5, 1000);
-
- assertEquals(5, factory.listFiles("tt").size());
-
- assertEquals(0, records.size());
- assertEquals(0, transactions.size());
-
- factory.setHoldCallbacks(true);
-
- for (int i = 0; i < 20; i++)
- {
- journalImpl.appendAddRecordTransactional(1l, (long) i, (byte) 0,
- new SimpleEncoding(1, (byte) 15));
- }
-
- try
- {
- journalImpl.appendRollbackRecord(1l);
- fail ("Supposed to timeout");
- }
- catch (Exception e)
- {
- }
-
- factory.flushAllCallbacks();
-
- factory.setHoldCallbacks(false);
-
- // it shouldn't fail
- journalImpl.appendRollbackRecord(1l);
-
- setupJournal(JOURNAL_SIZE, 1, 5, 1000);
-
- assertEquals(0, records.size());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- records = new ArrayList<RecordInfo>();
-
- transactions = new ArrayList<PreparedTransactionInfo>();
-
- factory = null;
-
- journalImpl = null;
-
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
-
- if (journalImpl != null)
- {
- try
- {
- journalImpl.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- // Private -------------------------------------------------------
- private void setupJournal(final int journalSize, final int alignment,
- final int numberOfMinimalFiles, final int timeout) throws Exception
- {
- if (factory == null)
- {
- factory = new FakeSequentialFileFactory(alignment, true);
- }
-
- if (journalImpl != null)
- {
- journalImpl.stop();
- }
-
- journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true,
- true, factory, "tt", "tt", 1000, timeout);
-
- journalImpl.start();
-
- records.clear();
- transactions.clear();
-
- journalImpl.load(records, transactions);
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/util/VariableLatchTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/util/VariableLatchTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/util/VariableLatchTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -25,7 +25,7 @@
import org.jboss.messaging.util.VariableLatch;
/**
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
*/
public class VariableLatchTest extends UnitTestCase
{
@@ -36,14 +36,7 @@
latch.up();
long start = System.currentTimeMillis();
- try
- {
- latch.waitCompletion(1000);
- fail("It was suppsoed to throw an exception");
- }
- catch (Exception ignored)
- {
- }
+ assertFalse(latch.waitCompletion(1000));
long end = System.currentTimeMillis();
assertTrue("Timeout didn't work correctly", end - start >= 1000
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -78,8 +78,6 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE, conf.getJournalFileSize());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES, conf.getJournalMinFiles());
assertEquals(ConfigurationImpl.DEFAULT_MAX_AIO, conf.getJournalMaxAIO());
- assertEquals(ConfigurationImpl.DEFAULT_AIO_TIMEOUT, conf.getJournalAIOTimeout());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_TASK_PERIOD, conf.getJournalTaskPeriod());
}
public void testSetGetAttributes()
@@ -184,14 +182,6 @@
conf.setJournalMaxAIO(i);
assertEquals(i, conf.getJournalMaxAIO());
- l = randomLong();
- conf.setJournalAIOTimeout(l);
- assertEquals(l, conf.getJournalAIOTimeout());
-
- l = randomLong();
- conf.setJournalTaskPeriod(l);
- assertEquals(l, conf.getJournalTaskPeriod());
-
i = randomInt();
conf.setServerID(i);
assertEquals(i, conf.getServerID());
@@ -313,12 +303,6 @@
i = randomInt();
conf.setJournalMaxAIO(i);
- l = randomLong();
- conf.setJournalAIOTimeout(l);
-
- l = randomLong();
- conf.setJournalTaskPeriod(l);
-
i = randomInt();
conf.setServerID(i);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -59,8 +59,6 @@
assertEquals(12345678, conf.getJournalFileSize());
assertEquals(100, conf.getJournalMinFiles());
assertEquals(56546, conf.getJournalMaxAIO());
- assertEquals(432323, conf.getJournalAIOTimeout());
- assertEquals(6544, conf.getJournalTaskPeriod());
assertEquals(false, conf.getConnectionParams().isInVMOptimisationEnabled());
assertEquals(7654, conf.getConnectionParams().getCallTimeout());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -72,7 +72,7 @@
FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200, true);
- SequentialFile file = factory.createSequentialFile("test1", 100, 10000);
+ SequentialFile file = factory.createSequentialFile("test1", 100);
file.open();
@@ -499,52 +499,6 @@
}
-
- public void testDeleteme() throws Exception
- {
- final int JOURNAL_SIZE = 2000;
-
- setupJournal(JOURNAL_SIZE, 100);
-
- for (int i = 0; i < 10; i++)
- {
- journalImpl.appendAddRecordTransactional(1, i, (byte) 1, new SimpleEncoding(1,(byte) 1));
- journalImpl.forceMoveNextFile();
- }
-
- journalImpl.appendCommitRecord(1l);
-
- journalImpl.debugWait();
-
- setupJournal(JOURNAL_SIZE, 100);
-
- assertEquals(10, records.size());
- assertEquals(0, transactions.size());
-
- journalImpl.checkAndReclaimFiles();
-
- for (int i = 0; i < 2; i++)
- {
- journalImpl.appendDeleteRecordTransactional(2l, (long)i);
- //journalImpl.appendAddRecordTransactional(2l, i*10, (byte) 1, new SimpleEncoding(1,(byte) 1));
- journalImpl.forceMoveNextFile();
- }
-
- journalImpl.appendCommitRecord(2l);
-
- journalImpl.appendAddRecord(100, (byte)1, new SimpleEncoding(5, (byte)1));
-
- journalImpl.forceMoveNextFile();
-
- journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1));
-
- journalImpl.checkAndReclaimFiles();
-
- setupJournal(JOURNAL_SIZE, 100);
- }
-
-
-
public void testTotalSize() throws Exception
{
final int JOURNAL_SIZE = 2000;
@@ -589,7 +543,7 @@
journalImpl.appendCommitRecord(1l);
- SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+ SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000);
file.open();
@@ -658,7 +612,7 @@
journalImpl.appendCommitRecord(2l);
- SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+ SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000);
file.open();
@@ -771,7 +725,7 @@
journalImpl.appendCommitRecord(1l);
- SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+ SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000);
file.open();
@@ -1013,7 +967,7 @@
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles,
true, true,
factory,
- "tt", "tt", 1000, 10000);
+ "tt", "tt", 1000);
journalImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -363,9 +363,9 @@
stubValues();
- EasyMock.expect(mockFactory.createSequentialFile(EasyMock.isA(String.class), EasyMock.anyInt(), EasyMock.anyLong())).andReturn(file1);
+ EasyMock.expect(mockFactory.createSequentialFile(EasyMock.isA(String.class), EasyMock.anyInt())).andReturn(file1);
- EasyMock.expect(mockFactory.createSequentialFile(EasyMock.isA(String.class), EasyMock.anyInt(), EasyMock.anyLong())).andReturn(file2);
+ EasyMock.expect(mockFactory.createSequentialFile(EasyMock.isA(String.class), EasyMock.anyInt())).andReturn(file2);
file1.open();
@@ -399,7 +399,7 @@
JournalImpl journalImpl = new JournalImpl(100 * 1024, 2,
true, true,
mockFactory,
- "tt", "tt", 1000, 1000);
+ "tt", "tt", 1000);
journalImpl.start();
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -0,0 +1,333 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+public class JournalAsyncTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private FakeSequentialFileFactory factory;
+
+ JournalImpl journalImpl = null;
+
+ private ArrayList<RecordInfo> records = null;
+
+ private ArrayList<PreparedTransactionInfo> transactions = null;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAsynchronousCommit() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100, 5);
+
+ factory.setHoldCallbacks(true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class LocalThread extends Thread
+ {
+ Exception e;
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1,(byte)0));
+ }
+
+ latch.countDown();
+ factory.setHoldCallbacks(false);
+ journalImpl.appendCommitRecord(1l);
+ }
+ catch (Exception e)
+ {
+ this.e = e;
+ }
+ }
+ };
+
+ LocalThread t = new LocalThread();
+ t.start();
+
+ latch.await();
+
+ Thread.yield();
+
+ assertTrue(t.isAlive());
+
+ factory.flushAllCallbacks();
+
+ t.join();
+
+ if (t.e != null)
+ {
+ throw t.e;
+ }
+ }
+
+ public void testAsynchronousRollbackWithError() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100, 5);
+
+ factory.setHoldCallbacks(true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class LocalThread extends Thread
+ {
+ Exception e;
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1,(byte)0));
+ }
+
+ latch.countDown();
+ journalImpl.appendRollbackRecord(1l);
+ }
+ catch (Exception e)
+ {
+ this.e = e;
+ }
+ }
+ };
+
+ LocalThread t = new LocalThread();
+ t.start();
+
+ latch.await();
+
+ Thread.yield();
+
+ assertTrue(t.isAlive());
+
+ factory.setCallbackAsError(0);
+
+ factory.flushCallback(0);
+
+ Thread.yield();
+
+ assertTrue(t.isAlive());
+
+ factory.flushAllCallbacks();
+
+ t.join();
+
+ assertNotNull (t.e);
+ }
+
+ public void testAsynchronousCommitWithError() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100, 5);
+
+ factory.setHoldCallbacks(true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class LocalThread extends Thread
+ {
+ Exception e;
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1,(byte)0));
+ }
+
+ latch.countDown();
+ journalImpl.appendCommitRecord(1l);
+ }
+ catch (Exception e)
+ {
+ this.e = e;
+ }
+ }
+ };
+
+ LocalThread t = new LocalThread();
+ t.start();
+
+ latch.await();
+
+ Thread.yield();
+
+ assertTrue(t.isAlive());
+
+ factory.setCallbackAsError(0);
+
+ factory.flushCallback(0);
+
+ Thread.yield();
+
+ assertTrue(t.isAlive());
+
+ factory.flushAllCallbacks();
+
+ t.join();
+
+ assertNotNull (t.e);
+ }
+
+ // If a callback error already arrived, we should just throw the exception right away
+ public void testPreviousError() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100, 5);
+
+ factory.setHoldCallbacks(true);
+ factory.setGenerateErrors(true);
+
+ journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1,(byte)0));
+
+ factory.flushAllCallbacks();
+
+ try
+ {
+ journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1,(byte)0));
+ fail("Exception expected");
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ public void testSyncNonTransaction() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100, 5);
+
+ factory.setGenerateErrors(true);
+
+ try
+ {
+ journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0));
+ fail("Exception expected");
+ }
+ catch (Exception ignored)
+ {
+
+ }
+
+
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ records = new ArrayList<RecordInfo>();
+
+ transactions = new ArrayList<PreparedTransactionInfo>();
+
+ factory = null;
+
+ journalImpl = null;
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (journalImpl != null)
+ {
+ try
+ {
+ journalImpl.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ // Private -------------------------------------------------------
+ private void setupJournal(final int journalSize, final int alignment,
+ final int numberOfMinimalFiles) throws Exception
+ {
+ if (factory == null)
+ {
+ factory = new FakeSequentialFileFactory(alignment, true);
+ }
+
+ if (journalImpl != null)
+ {
+ journalImpl.stop();
+ }
+
+ journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true,
+ true, factory, "tt", "tt", 1000);
+
+ journalImpl.start();
+
+ records.clear();
+ transactions.clear();
+
+ journalImpl.load(records, transactions);
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -141,7 +141,7 @@
public void createJournal() throws Exception
{
journal =
- new JournalImpl(fileSize, minFiles, sync, sync, fileFactory, filePrefix, fileExtension, maxAIO, 120000);
+ new JournalImpl(fileSize, minFiles, sync, sync, fileFactory, filePrefix, fileExtension, maxAIO);
journal.disableAutoReclaiming();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -125,7 +125,7 @@
{
try
{
- new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, fileFactory, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, fileFactory, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -136,7 +136,7 @@
try
{
- new JournalImpl(10 * 1024, 1, true, true, fileFactory, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(10 * 1024, 1, true, true, fileFactory, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -147,7 +147,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, null, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(10 * 1024, 10, true, true, null, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -158,7 +158,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, null, fileExtension, 1, 120);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, null, fileExtension, 1);
fail("Should throw exception");
}
@@ -169,7 +169,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 1, 120);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 1);
fail("Should throw exception");
}
@@ -180,7 +180,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 0, 120);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 0);
fail("Should throw exception");
}
@@ -188,18 +188,7 @@
{
//Ok
}
-
- try
- {
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, fileExtension, 0, -1);
-
- fail("Should throw exception");
- }
- catch (IllegalStateException e)
- {
- //Ok
- }
-
+
}
public void testFilesImmediatelyAfterload() throws Exception
@@ -2133,6 +2122,7 @@
for (int i = 0; i < 100; i++)
{
+ System.out.println("i=" + i);
byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
journal.appendAddRecord(i, (byte)0, record);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -74,7 +74,7 @@
expectedFiles.add(fileName);
- SequentialFile sf = factory.createSequentialFile(fileName, 1, 120);
+ SequentialFile sf = factory.createSequentialFile(fileName, 1);
sf.open();
@@ -85,10 +85,10 @@
//Create a couple with a different extension - they shouldn't be picked up
- SequentialFile sf1 = factory.createSequentialFile("different.file", 1, 120);
+ SequentialFile sf1 = factory.createSequentialFile("different.file", 1);
sf1.open();
- SequentialFile sf2 = factory.createSequentialFile("different.cheese", 1, 120);
+ SequentialFile sf2 = factory.createSequentialFile("different.cheese", 1);
sf2.open();
List<String> fileNames = factory.listFiles("jbm");
@@ -119,7 +119,7 @@
public void testFill() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("fill.jbm", 1, 120);
+ SequentialFile sf = factory.createSequentialFile("fill.jbm", 1);
sf.open();
@@ -144,11 +144,11 @@
public void testDelete() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("delete-me.jbm", 1, 120);
+ SequentialFile sf = factory.createSequentialFile("delete-me.jbm", 1);
sf.open();
- SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", 1, 120);
+ SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", 1);
sf2.open();
@@ -174,7 +174,7 @@
public void testWriteandRead() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("write.jbm", 1, 120);
+ SequentialFile sf = factory.createSequentialFile("write.jbm", 1);
sf.open();
@@ -237,7 +237,7 @@
public void testPosition() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("position.jbm", 1, 120);
+ SequentialFile sf = factory.createSequentialFile("position.jbm", 1);
sf.open();
@@ -303,7 +303,7 @@
public void testOpenClose() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("openclose.jbm", 1, 120);
+ SequentialFile sf = factory.createSequentialFile("openclose.jbm", 1);
sf.open();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -57,8 +57,10 @@
private volatile boolean holdCallbacks;
- private final List<Runnable> callbacksInHold;
+ private volatile boolean generateErrors;
+ private final List<CallbackRunnable> callbacksInHold;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -67,7 +69,7 @@
{
this.alignment = alignment;
this.supportsCallback = supportsCallback;
- callbacksInHold = new ArrayList<Runnable>();
+ callbacksInHold = new ArrayList<CallbackRunnable>();
}
public FakeSequentialFileFactory()
@@ -79,7 +81,7 @@
// Public --------------------------------------------------------
- public SequentialFile createSequentialFile(final String fileName, final int maxAIO, final long timeout) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final int maxAIO) throws Exception
{
FakeSequentialFile sf = fileMap.get(fileName);
@@ -152,7 +154,17 @@
{
this.holdCallbacks = holdCallbacks;
}
+
+ public boolean isGenerateErrors()
+ {
+ return generateErrors;
+ }
+ public void setGenerateErrors(boolean generateErrors)
+ {
+ this.generateErrors = generateErrors;
+ }
+
public void flushAllCallbacks()
{
for (Runnable action : callbacksInHold)
@@ -170,6 +182,11 @@
callbacksInHold.remove(run);
}
+ public void setCallbackAsError(int position)
+ {
+ callbacksInHold.get(position).setSendError(true);
+ }
+
public int getNumberOfCallbacks()
{
return callbacksInHold.size();
@@ -194,7 +211,48 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+ private class CallbackRunnable implements Runnable
+ {
+
+ final FakeSequentialFile file;
+ final ByteBuffer bytes;
+ final IOCallback callback;
+ volatile boolean sendError;
+
+ CallbackRunnable(FakeSequentialFile file, ByteBuffer bytes, IOCallback callback)
+ {
+ this.file = file;
+ this.bytes = bytes;
+ this.callback = callback;
+ }
+
+ public void run()
+ {
+
+ if (sendError)
+ {
+ callback.onError(1, "Fake aio error");
+ }
+ else
+ {
+ file.data.put(bytes);
+ if (callback!=null) callback.done();
+ }
+ }
+
+ public boolean isSendError()
+ {
+ return sendError;
+ }
+
+ public void setSendError(boolean sendError)
+ {
+ this.sendError = sendError;
+ }
+ }
+
public class FakeSequentialFile implements SequentialFile
{
private volatile boolean open;
@@ -321,20 +379,14 @@
checkAndResize(bytes.capacity() + position);
- Runnable action = new Runnable()
+ CallbackRunnable action = new CallbackRunnable(this, bytes, callback);
+
+ if (generateErrors)
{
+ action.setSendError(true);
+ }
- public void run()
- {
-
- data.put(bytes);
-
- if (callback!=null) callback.done();
- }
-
- };
-
- if (holdCallbacks && callback != null)
+ if (holdCallbacks)
{
FakeSequentialFileFactory.this.callbacksInHold.add(action);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -801,8 +801,7 @@
assertEquals("jbm-data", messageJournal.getFilePrefix());
assertEquals("jbm", messageJournal.getFileExtension());
assertEquals(config.getJournalMaxAIO(), messageJournal.getMaxAIO());
- assertEquals(config.getJournalAIOTimeout(), messageJournal.getAIOTimeout());
-
+
assertNotNull(jsm.getBindingsJournal());
TestableJournal bindingsJournal = (TestableJournal)jsm.getBindingsJournal();
@@ -814,7 +813,6 @@
assertEquals("jbm-bindings", bindingsJournal.getFilePrefix());
assertEquals("bindings", bindingsJournal.getFileExtension());
assertEquals(1, bindingsJournal.getMaxAIO());
- assertEquals(1, bindingsJournal.getAIOTimeout());
}
private EncodingSupport encodingMatch(final byte expectedRecord[])
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/util/VariableLatchTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/VariableLatchTest.java 2008-07-24 16:05:48 UTC (rev 4727)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/VariableLatchTest.java 2008-07-24 17:09:06 UTC (rev 4728)
@@ -85,7 +85,10 @@
{
try
{
- latch.waitCompletion(5000);
+ if (!latch.waitCompletion(5000))
+ {
+ log.error("Latch timed out");
+ }
}
catch (Exception e)
{
@@ -244,7 +247,10 @@
readyLatch.countDown();
try
{
- latch.waitCompletion(1000);
+ if (!latch.waitCompletion(1000))
+ {
+ log.error("Latch timed out!", new Exception ("trace"));
+ }
}
catch (Exception e)
{
@@ -287,7 +293,7 @@
assertNull(t.e);
- latch.waitCompletion(1000);
+ assertTrue(latch.waitCompletion(1000));
assertEquals(0, latch.getCount());
More information about the jboss-cvs-commits
mailing list