[jboss-cvs] JBoss Messaging SVN: r4746 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 29 15:40:18 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-29 15:40:18 -0400 (Tue, 29 Jul 2008)
New Revision: 4746
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Journal: Fixing reclaiming because of concurrent adds and deletes, few other tweaks
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/config/jbm-configuration.xml 2008-07-29 19:40:18 UTC (rev 4746)
@@ -95,13 +95,13 @@
<!-- 10 MB journal file size -->
<journal-file-size>10485760</journal-file-size>
- <journal-min-files>10</journal-min-files>
+ <journal-min-files>15</journal-min-files>
<!-- Maximum simultaneous asynchronous writes accepted by the native layer.
(parameter ignored on NIO)
You can verify the max AIO on the OS level at /proc/sys/fs/aio_max_nr. (aio-nr will give you the current max-aio being used)
-->
- <journal-max-aio>5000</journal-max-aio>
+ <journal-max-aio>10000</journal-max-aio>
</configuration>
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -290,7 +290,6 @@
// Native
// ------------------------------------------------------------------------------------------
- @SuppressWarnings("unchecked")
private static native long init(String fileName, int maxIO, Logger logger);
private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -39,6 +39,13 @@
*/
void open() throws Exception;
+ /**
+ * For certain operations (like loading) we don't need open the file with full maxIO
+ * @param maxIO
+ * @throws Exception
+ */
+ void open(int maxIO) throws Exception;
+
int getAlignment() throws Exception;
int calculateBlockStart(int position) throws Exception;
@@ -59,6 +66,8 @@
void position(int pos) throws Exception;
+ int position() throws Exception;
+
void close() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -166,20 +166,31 @@
return fileName;
}
- public synchronized void open() throws Exception
+ public void open() throws Exception
{
+ open(maxIO);
+ }
+
+ public synchronized void open(int currentMaxIO) throws Exception
+ {
opened = true;
executor = Executors.newSingleThreadExecutor();
aioFile = newFile();
- aioFile.open(journalDir + "/" + fileName, maxIO);
+ aioFile.open(journalDir + "/" + fileName, currentMaxIO);
position.set(0);
}
+
public void position(final int pos) throws Exception
{
position.set(pos);
}
+ public int position() throws Exception
+ {
+ return (int)position.get();
+ }
+
public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
int bytesToRead = bytes.limit();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
@@ -47,11 +48,11 @@
private int offset;
- private int posCount;
+ private final AtomicInteger posCount = new AtomicInteger(0);
private boolean canReclaim;
- private Map<JournalFile, Integer> negCounts = new ConcurrentHashMap<JournalFile, Integer>();
+ private Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
public JournalFileImpl(final SequentialFile file, final int orderingID)
{
@@ -62,7 +63,7 @@
public int getPosCount()
{
- return posCount;
+ return posCount.intValue();
}
public boolean isCanReclaim()
@@ -77,16 +78,12 @@
public void incNegCount(final JournalFile file)
{
- Integer count = negCounts.get(file);
-
- int c = count == null ? 1 : count.intValue() + 1;
-
- negCounts.put(file, c);
+ getOrCreateNegCount(file).incrementAndGet();
}
public int getNegCount(final JournalFile file)
{
- Integer count = negCounts.get(file);
+ AtomicInteger count = negCounts.get(file);
if (count == null)
{
@@ -100,12 +97,12 @@
public void incPosCount()
{
- posCount++;
+ posCount.incrementAndGet();
}
public void decPosCount()
{
- posCount--;
+ posCount.decrementAndGet();
}
public void extendOffset(final int delta)
@@ -151,13 +148,27 @@
{
StringBuilder builder = new StringBuilder();
- for (Entry<JournalFile, Integer> entry: negCounts.entrySet())
+ for (Entry<JournalFile, AtomicInteger> entry: negCounts.entrySet())
{
builder.append(" file = " + entry.getKey() + " negcount value = " + entry.getValue() + "\n");
}
return builder.toString();
}
+
+ private synchronized AtomicInteger getOrCreateNegCount(final JournalFile file)
+ {
+ AtomicInteger count = negCounts.get(file);
+
+ if (count == null)
+ {
+ count = new AtomicInteger();
+ negCounts.put(file, count);
+ }
+
+ return count;
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -199,6 +199,13 @@
private static final boolean trace = log.isTraceEnabled();
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging Journal
+ private static final void trace(String message)
+ {
+ log.trace(message);
+ }
+
// Constructors --------------------------------------------------
public JournalImpl(final int fileSize, final int minFiles,
@@ -276,9 +283,18 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-
- posFilesMap.put(id, new PosFiles(usedFile));
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+
+ posFilesMap.put(id, new PosFiles(usedFile));
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -302,9 +318,20 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
- posFilesMap.put(id, new PosFiles(usedFile));
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+
+ posFilesMap.put(id, new PosFiles(usedFile));
+ }
+ finally
+ {
+ lock.release();
+ }
+
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -334,9 +361,18 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-
- posFiles.addUpdateFile(usedFile);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+
+ posFiles.addUpdateFile(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
@@ -366,9 +402,18 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-
- posFiles.addUpdateFile(usedFile);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+
+ posFiles.addUpdateFile(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendDeleteRecord(long id) throws Exception
@@ -395,8 +440,18 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
- posFiles.addDelete(usedFile);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+
+ posFiles.addDelete(usedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
public long getTransactionID()
@@ -428,11 +483,21 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addPositive(usedFile, id);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ tx.addPositive(usedFile, id);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -456,11 +521,21 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addPositive(usedFile, id);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ tx.addPositive(usedFile, id);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
@@ -484,11 +559,21 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addPositive(usedFile, id);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ tx.addPositive(usedFile, id);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
@@ -513,11 +598,21 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addPositive(usedFile, id);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ tx.addPositive(usedFile, id);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
@@ -538,11 +633,21 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addNegative(usedFile, id);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ tx.addNegative(usedFile, id);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendPrepareRecord(final long txID) throws Exception
@@ -559,9 +664,22 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- JournalFile usedFile = writeTransaction(PREPARE_RECORD, txID, tx);
+ ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx);
- tx.prepare(usedFile);
+
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+
+ tx.prepare(usedFile);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void appendCommitRecord(final long txID) throws Exception
@@ -571,20 +689,35 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.get(txID);
+ JournalTransaction tx = transactionInfos.remove(txID);
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- JournalFile usedFile = writeTransaction(COMMIT_RECORD, txID, tx);
+ ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx);
- transactionInfos.remove(txID);
- transactionCallbacks.remove(txID);
- tx.commit(usedFile);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+
+ transactionCallbacks.remove(txID);
+
+ tx.commit(usedFile);
+
+ }
+ finally
+ {
+ lock.release();
+ }
+
+
+
}
public void appendRollbackRecord(final long txID) throws Exception
@@ -611,11 +744,21 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-
- transactionCallbacks.remove(txID);
-
- tx.rollback(usedFile);
+ try
+ {
+ lock.acquire();
+
+ JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+
+ transactionCallbacks.remove(txID);
+
+ tx.rollback(usedFile);
+
+ }
+ finally
+ {
+ lock.release();
+ }
}
public synchronized long load(final List<RecordInfo> committedRecords,
@@ -676,7 +819,7 @@
List<JournalFile> orderedFiles = orderFiles();
- int lastDataPos = -1;
+ int lastDataPos = SIZE_HEADER;
long maxTransactionID = -1;
@@ -684,7 +827,7 @@
for (JournalFile file: orderedFiles)
{
- file.getFile().open();
+ file.getFile().open(1);
ByteBuffer bb = fileFactory.newBuffer(fileSize);
@@ -1224,7 +1367,7 @@
{
//File can be reclaimed or deleted
- if (trace) log.trace("Reclaiming file " + file);
+ if (trace) trace("Reclaiming file " + file);
dataFiles.remove(file);
@@ -1239,7 +1382,7 @@
}
else
{
- file.getFile().open();
+ file.getFile().open(1);
file.getFile().delete();
}
@@ -1390,7 +1533,7 @@
SequentialFile sf = file.getFile();
- sf.open();
+ sf.open(1);
ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
@@ -1460,7 +1603,7 @@
}
/** a method that shares the logic of writing a complete transaction between COMMIT and PREPARE */
- private JournalFile writeTransaction(final byte recordType, final long txID, final JournalTransaction tx) throws Exception
+ private ByteBuffer writeTransaction(final byte recordType, final long txID, final JournalTransaction tx) throws Exception
{
int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2;
@@ -1481,8 +1624,7 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
- return usedFile;
+ return bb;
}
private boolean isTransaction(final byte recordType)
@@ -1552,7 +1694,7 @@
{
SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
- file.open();
+ file.open(1);
ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
@@ -1587,41 +1729,35 @@
return orderedFiles;
}
+ /**
+ * You need to call lock.acquire before calling this method
+ * */
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
- lock.acquire();
int size = bb.capacity();
-
- try
- {
- checkFile(size);
- bb.position(SIZE_BYTE);
- if (currentFile == null)
+ checkFile(size);
+ bb.position(SIZE_BYTE);
+ if (currentFile == null)
+ {
+ throw new Exception ("Current file = null");
+ }
+ bb.putInt(currentFile.getOrderingID());
+ bb.rewind();
+ if (callback != null)
+ {
+ currentFile.getFile().write(bb, callback);
+ if (sync)
{
- throw new Exception ("Current file = null");
+ callback.waitCompletion();
}
- bb.putInt(currentFile.getOrderingID());
- bb.rewind();
- if (callback != null)
- {
- currentFile.getFile().write(bb, callback);
- if (sync)
- {
- callback.waitCompletion();
- }
- }
- else
- {
- currentFile.getFile().write(bb, sync);
- }
- currentFile.extendOffset(size);
- return currentFile;
}
- finally
+ else
{
- lock.release();
+ currentFile.getFile().write(bb, sync);
}
+ currentFile.extendOffset(size);
+ return currentFile;
}
private JournalFile createFile(boolean keepOpened) throws Exception
@@ -1724,7 +1860,7 @@
{
try
{
- checkAndReclaimFiles();
+ checkAndReclaimFiles();
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -74,9 +74,9 @@
{
return fileName;
}
-
- public void open() throws Exception
- {
+
+ public synchronized void open() throws Exception
+ {
file = new File(journalDir + "/" + fileName);
rfile = new RandomAccessFile(file, "rw");
@@ -84,6 +84,11 @@
channel = rfile.getChannel();
}
+ public void open(int currentMaxIO) throws Exception
+ {
+ open();
+ }
+
public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -190,4 +195,9 @@
channel.position(pos);
}
+ public int position() throws Exception
+ {
+ return (int) channel.position();
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -51,6 +51,13 @@
{
private static final Logger log = Logger.getLogger(Reclaimer.class);
+ private static boolean trace = log.isTraceEnabled();
+
+ private static void trace(String message)
+ {
+ log.trace(message);
+ }
+
public void scan(final JournalFile[] files)
{
for (int i = 0; i < files.length; i++)
@@ -62,9 +69,22 @@
int posCount = currentFile.getPosCount();
int totNeg = 0;
-
+
+ if (trace)
+ {
+ trace("posCount on " + currentFile + " = " + posCount);
+ }
+
for (int j = i; j < files.length; j++)
{
+ if (trace)
+ {
+ if (files[j].getNegCount(currentFile) != 0)
+ {
+ trace("Negative from " + files[j] + " = " + files[j].getNegCount(currentFile));
+ }
+ }
+
totNeg += files[j].getNegCount(currentFile);
}
@@ -88,6 +108,11 @@
}
else
{
+ if (trace)
+ {
+ trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
+ }
+
currentFile.setCanReclaim(false);
break;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -25,6 +25,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -33,7 +38,6 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.jboss.messaging.tests.util.UnitTestCase;
@@ -862,7 +866,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(50,(byte) 1));
+ journalImpl.appendAddRecordTransactional(1, i, (byte) 1, new SimpleEncoding(50,(byte) 1));
journalImpl.forceMoveNextFile();
}
@@ -997,6 +1001,96 @@
}
+ public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception
+ {
+ final int JOURNAL_SIZE = 10 * 1024;
+
+ setupJournal(JOURNAL_SIZE, 1);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ final CountDownLatch latchReady = new CountDownLatch(2);
+ final CountDownLatch latchStart = new CountDownLatch(1);
+ final AtomicInteger finishedOK = new AtomicInteger(0);
+ final BlockingQueue<Integer> queueDelete = new LinkedBlockingQueue<Integer>();
+
+ final int NUMBER_OF_ELEMENTS = 500;
+
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ latchReady.countDown();
+ latchStart.await();
+ for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
+ {
+ journalImpl.appendAddRecordTransactional((long)i, i, (byte) 1, new SimpleEncoding(50,(byte) 1));
+ journalImpl.appendCommitRecord((long)i);
+ queueDelete.offer(i);
+ }
+ finishedOK.incrementAndGet();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t2 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ latchReady.countDown();
+ latchStart.await();
+ for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
+ {
+ Integer toDelete = queueDelete.poll(10, TimeUnit.SECONDS);
+ if (toDelete == null)
+ {
+ break;
+ }
+ journalImpl.appendDeleteRecord(toDelete);
+ }
+ finishedOK.incrementAndGet();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ t1.start();
+ t2.start();
+
+ latchReady.await();
+ latchStart.countDown();
+
+ t1.join();
+ t2.join();
+
+ assertEquals(2, finishedOK.intValue());
+
+ journalImpl.forceMoveNextFile();
+
+ journalImpl.checkAndReclaimFiles();
+
+ assertEquals(0, journalImpl.getDataFilesCount());
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-29 19:40:18 UTC (rev 4746)
@@ -302,9 +302,14 @@
{
return fileName;
}
-
+
public void open() throws Exception
{
+ open(0);
+ }
+
+ public synchronized void open(int currentMaxIO) throws Exception
+ {
open = true;
}
@@ -363,6 +368,11 @@
data.position(pos);
}
+
+ public int position() throws Exception
+ {
+ return data.position();
+ }
public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
More information about the jboss-cvs-commits
mailing list