[jboss-cvs] JBoss Messaging SVN: r4689 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 17 11:34:33 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-17 11:34:33 -0400 (Thu, 17 Jul 2008)
New Revision: 4689
Modified:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
Log:
Journal work (treating holes on transactions)
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-07-17 15:34:33 UTC (rev 4689)
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -32,7 +33,6 @@
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.AsynchronousFile;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.util.VariableLatch;
/**
@@ -116,7 +116,6 @@
private String fileName;
private Thread poller;
private int maxIO;
- private VariableLatch writeLatch = new VariableLatch();
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Lock writeLock = lock.writeLock();
private Semaphore writeSemaphore;
@@ -164,7 +163,10 @@
try
{
writeLock.lock();
- writeLatch.waitCompletion(timeout);
+ if (!writeSemaphore.tryAcquire(maxIO, timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new IllegalStateException("Timeout!");
+ }
writeSemaphore = null;
stopPoller(handler);
// We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
@@ -184,7 +186,6 @@
public void write(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
{
checkOpened();
- writeLatch.up();
writeSemaphore.acquireUninterruptibly();
try
{
@@ -193,7 +194,6 @@
catch (RuntimeException e)
{
writeSemaphore.release();
- writeLatch.down();
throw e;
}
@@ -202,7 +202,6 @@
public void read(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
{
checkOpened();
- writeLatch.up();
writeSemaphore.acquireUninterruptibly();
try
{
@@ -211,7 +210,6 @@
catch (RuntimeException e)
{
writeSemaphore.release();
- writeLatch.down();
throw e;
}
}
@@ -242,15 +240,14 @@
private void callbackDone(final AIOCallback callback)
{
writeSemaphore.release();
- writeLatch.down();
callback.done();
}
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackError(final AIOCallback callback, final int errorCode, final String errorMessage)
{
+ log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
- writeLatch.down();
callback.onError(errorCode, errorMessage);
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-17 15:34:33 UTC (rev 4689)
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
@@ -51,7 +52,7 @@
private boolean canReclaim;
- private Map<JournalFile, Integer> negCounts = new HashMap<JournalFile, Integer>();
+ private Map<JournalFile, Integer> negCounts = new ConcurrentHashMap<JournalFile, Integer>();
public JournalFileImpl(final SequentialFile file, final int orderingID)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-17 15:34:33 UTC (rev 4689)
@@ -82,8 +82,6 @@
private static final int SIZE_LONG = 8;
private static final int SIZE_INT = 4;
-
- private static final int SIZE_SHORT = 2;
private static final int SIZE_BYTE = 1;
@@ -120,16 +118,15 @@
public static final byte DELETE_RECORD_TX = 16;
- public static final int SIZE_PREPARE_RECORD = BASIC_SIZE + SIZE_LONG;
+ public static final int SIZE_PREPARE_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
public static final byte PREPARE_RECORD = 17;
+ public static final int SIZE_COMMIT_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
- public static final int SIZE_COMMIT_RECORD = BASIC_SIZE + SIZE_LONG;
-
public static final byte COMMIT_RECORD = 18;
- public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG;
+ public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
public static final byte ROLLBACK_RECORD = 19;
@@ -170,7 +167,7 @@
private final Map<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
- private final Map<Long, TransactionNegPos> transactionInfos = new ConcurrentHashMap<Long, TransactionNegPos>();
+ private final Map<Long, JournalTransaction> transactionInfos = new ConcurrentHashMap<Long, JournalTransaction>();
private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
@@ -195,12 +192,12 @@
private volatile int state;
- private volatile long lastOrderingID;
-
private final AtomicLong transactionIDSequence = new AtomicLong(0);
private Reclaimer reclaimer = new Reclaimer();
+ private Thread shutdownHook = null;
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -438,13 +435,11 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
- usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+ JournalTransaction tx = getTransactionInfo(txID);
- TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addPos(usedFile, id);
+ tx.addPositive(usedFile, id);
}
public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -468,13 +463,11 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
- usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+ JournalTransaction tx = getTransactionInfo(txID);
- TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addPos(usedFile, id);
+ tx.addPositive(usedFile, id);
}
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
@@ -498,13 +491,11 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
- usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+ JournalTransaction tx = getTransactionInfo(txID);
- TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addPos(usedFile, id);
+ tx.addPositive(usedFile, id);
}
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
@@ -529,13 +520,11 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
- usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+ JournalTransaction tx = getTransactionInfo(txID);
- TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addPos(usedFile, id);
+ tx.addPositive(usedFile, id);
}
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
@@ -556,13 +545,11 @@
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
- usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+ JournalTransaction tx = getTransactionInfo(txID);
- TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addNeg(usedFile, id);
+ tx.addNegative(usedFile, id);
}
public void appendPrepareRecord(final long txID) throws Exception
@@ -572,7 +559,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- TransactionNegPos tx = transactionInfos.get(txID);
+ JournalTransaction tx = transactionInfos.get(txID);
if (tx == null)
{
@@ -586,13 +573,12 @@
bb.put(PREPARE_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
+ bb.putInt(tx.getNumberOfElements());
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
- usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-
tx.prepare(usedFile);
}
@@ -603,7 +589,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- TransactionNegPos tx = transactionInfos.remove(txID);
+ JournalTransaction tx = transactionInfos.remove(txID);
if (tx == null)
{
@@ -616,14 +602,13 @@
bb.put(COMMIT_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
- bb.putLong(txID);
+ bb.putLong(txID);
+ bb.putInt(tx.getNumberOfElements());
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
- usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-
transactionCallbacks.remove(txID);
tx.commit(usedFile);
@@ -637,7 +622,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- TransactionNegPos tx = transactionInfos.remove(txID);
+ JournalTransaction tx = transactionInfos.remove(txID);
if (tx == null)
{
@@ -651,13 +636,12 @@
bb.put(ROLLBACK_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
+ bb.putInt(tx.getNumberOfElements());
bb.putInt(size);
bb.rewind();
- JournalFile usedFile;
+ JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
- usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-
transactionCallbacks.remove(txID);
tx.rollback(usedFile);
@@ -671,6 +655,8 @@
throw new IllegalStateException("Journal must be in started state");
}
+ addShutdownHook();
+
Set<Long> recordsToDelete = new HashSet<Long>();
Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
@@ -934,16 +920,16 @@
tx.recordInfos.add(new RecordInfo(id, userRecordType, record, false));
- TransactionNegPos tnp = transactionInfos.get(txID);
+ JournalTransaction tnp = transactionInfos.get(txID);
if (tnp == null)
{
- tnp = new TransactionNegPos();
+ tnp = new JournalTransaction();
transactionInfos.put(txID, tnp);
}
- tnp.addPos(file, id);
+ tnp.addPositive(file, id);
hasData = true;
@@ -972,16 +958,16 @@
tx.recordInfos.add(new RecordInfo(id, userRecordType, record, true));
- TransactionNegPos tnp = transactionInfos.get(txID);
+ JournalTransaction tnp = transactionInfos.get(txID);
if (tnp == null)
{
- tnp = new TransactionNegPos();
+ tnp = new JournalTransaction();
transactionInfos.put(txID, tnp);
}
- tnp.addPos(file, id);
+ tnp.addPositive(file, id);
hasData = true;
@@ -1004,16 +990,16 @@
tx.recordsToDelete.add(id);
- TransactionNegPos tnp = transactionInfos.get(txID);
+ JournalTransaction tnp = transactionInfos.get(txID);
if (tnp == null)
{
- tnp = new TransactionNegPos();
+ tnp = new JournalTransaction();
transactionInfos.put(txID, tnp);
}
- tnp.addNeg(file, id);
+ tnp.addNegative(file, id);
hasData = true;
@@ -1021,7 +1007,8 @@
}
case PREPARE_RECORD:
{
- long txID = bb.getLong();
+ long txID = bb.getLong();
+ int numberOfElements = bb.getInt();
maxTransactionID = Math.max(maxTransactionID, txID);
@@ -1034,39 +1021,56 @@
tx.prepared = true;
- TransactionNegPos tnp = transactionInfos.get(txID);
+ JournalTransaction journalTransaction = transactionInfos.get(txID);
- if (tnp == null)
+ if (journalTransaction == null)
{
throw new IllegalStateException("Cannot find tx " + txID);
}
+
+ if (numberOfElements == journalTransaction.getNumberOfElements())
+ {
+ journalTransaction.prepare(file);
+ }
+ else
+ {
+ journalTransaction.setInvalid(true);
+ tx.invalid = true;
+ }
- tnp.prepare(file);
-
hasData = true;
break;
}
case COMMIT_RECORD:
{
- long txID = bb.getLong();
+ long txID = bb.getLong();
+ int numberOfElements = bb.getInt();
maxTransactionID = Math.max(maxTransactionID, txID);
TransactionHolder tx = transactions.remove(txID);
if (tx != null)
{
- records.addAll(tx.recordInfos);
- recordsToDelete.addAll(tx.recordsToDelete);
- TransactionNegPos tnp = transactionInfos.remove(txID);
+ JournalTransaction tnp = transactionInfos.remove(txID);
if (tnp == null)
{
throw new IllegalStateException("Cannot find tx " + txID);
}
- tnp.commit(file);
+ if (numberOfElements == tnp.getNumberOfElements())
+ {
+ records.addAll(tx.recordInfos);
+ recordsToDelete.addAll(tx.recordsToDelete);
+ tnp.commit(file);
+ }
+ else
+ {
+ log.warn("Transaction " + txID + " is missing " + (numberOfElements - tnp.getNumberOfElements()) + " so the transaction is being ignored");
+ tnp.rollback(file);
+ }
hasData = true;
}
@@ -1075,7 +1079,8 @@
}
case ROLLBACK_RECORD:
{
- long txID = bb.getLong();
+ long txID = bb.getLong();
+ /* int numberOfElements = */ bb.getInt(); // Not being currently used
maxTransactionID = Math.max(maxTransactionID, txID);
@@ -1083,7 +1088,7 @@
if (tx != null)
{
- TransactionNegPos tnp = transactionInfos.remove(txID);
+ JournalTransaction tnp = transactionInfos.remove(txID);
if (tnp == null)
{
@@ -1185,11 +1190,11 @@
for (TransactionHolder transaction: transactions.values())
{
- if (!transaction.prepared)
+ if (!transaction.prepared || transaction.invalid)
{
log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
- TransactionNegPos transactionInfo = this.transactionInfos.get(transaction.transactionID);
+ JournalTransaction transactionInfo = this.transactionInfos.get(transaction.transactionID);
if (transactionInfo == null)
{
@@ -1224,7 +1229,7 @@
return this.fileFactory.getAlignment();
}
- public synchronized void checkReclaimStatus() throws Exception
+ public void checkReclaimStatus() throws Exception
{
JournalFile[] files = new JournalFile[dataFiles.size()];
@@ -1465,10 +1470,13 @@
this.closingExecutor = Executors.newSingleThreadExecutor();
state = STATE_STARTED;
+
}
public synchronized void stop() throws Exception
{
+ clearShutdownHook();
+
if (state == STATE_STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
@@ -1529,6 +1537,46 @@
// Private -----------------------------------------------------------------------------
+ private void clearShutdownHook()
+ {
+ if (shutdownHook != null)
+ {
+ try
+ {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (Throwable e)
+ {
+ }
+ shutdownHook = null;
+ }
+ }
+
+ private void addShutdownHook()
+ {
+
+ clearShutdownHook();
+
+
+ shutdownHook = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ log.info("Journal being stopped");
+ JournalImpl.this.stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e, e);
+ }
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ }
+
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
lock.acquire();
@@ -1562,17 +1610,6 @@
}
}
- private void repairFrom(final int pos, final JournalFile file) throws Exception
- {
- log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
- " in the record that starts at position " + pos + ". " +
- "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
-
- file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
-
- file.getFile().position(pos);
- }
-
private JournalFile createFile(boolean keepOpened) throws Exception
{
int orderingID = generateOrderingID();
@@ -1658,10 +1695,6 @@
try
{
pushOpenedFile();
- if (autoReclaim)
- {
- checkAndReclaimFiles();
- }
}
catch (Exception e)
{
@@ -1669,6 +1702,23 @@
}
}
});
+ if (autoReclaim)
+ {
+ openExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ checkAndReclaimFiles();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ });
+ }
JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
@@ -1726,13 +1776,13 @@
});
}
- private TransactionNegPos getTransactionInfo(final long txID)
+ private JournalTransaction getTransactionInfo(final long txID)
{
- TransactionNegPos tx = transactionInfos.get(txID);
+ JournalTransaction tx = transactionInfos.get(txID);
if (tx == null)
{
- tx = new TransactionNegPos();
+ tx = new JournalTransaction();
transactionInfos.put(txID, tx);
}
@@ -1839,7 +1889,7 @@
}
}
- private class TransactionNegPos
+ private class JournalTransaction
{
private List<Pair<JournalFile, Long>> pos;
@@ -1847,25 +1897,31 @@
private Set<JournalFile> transactionPos;
- void addTXPosCount(final JournalFile file)
+ // Number of elements participating on the transaction
+ // Used to verify completion on reload
+ private final AtomicInteger numberOfElements = new AtomicInteger(0);
+
+ private boolean invalid = false;
+
+ public int getNumberOfElements()
{
- if (transactionPos == null)
- {
- transactionPos = new HashSet<JournalFile>();
- }
-
- if (!transactionPos.contains(file))
- {
- transactionPos.add(file);
-
- //We add a pos for the transaction itself in the file - this prevents any transactional operations
- //being deleted before a commit or rollback is written
- file.incPosCount();
- }
+ return numberOfElements.get();
}
- void addPos(final JournalFile file, final long id)
- {
+ public void setInvalid(boolean b)
+ {
+ this.invalid = b;
+ }
+
+ public boolean isInvalid()
+ {
+ return this.invalid;
+ }
+
+ public void addPositive(final JournalFile file, final long id)
+ {
+ numberOfElements.incrementAndGet();
+
addTXPosCount(file);
if (pos == null)
@@ -1876,8 +1932,10 @@
pos.add(new Pair<JournalFile, Long>(file, id));
}
- void addNeg(final JournalFile file, final long id)
+ public void addNegative(final JournalFile file, final long id)
{
+ numberOfElements.incrementAndGet();
+
addTXPosCount(file);
if (neg == null)
@@ -1888,7 +1946,7 @@
neg.add(new Pair<JournalFile, Long>(file, id));
}
- void commit(final JournalFile file)
+ public void commit(final JournalFile file)
{
if (pos != null)
{
@@ -1932,7 +1990,7 @@
}
}
- void rollback(JournalFile file)
+ public void rollback(JournalFile file)
{
//Now add negs for the pos we added in each file in which there were transactional operations
//Note that we do this on rollback as we do on commit, since we need to ensure the file containing
@@ -1946,14 +2004,14 @@
}
}
- void prepare(JournalFile file)
+ public void prepare(JournalFile file)
{
//We don't want the prepare record getting deleted before time
addTXPosCount(file);
}
- void forget()
+ public void forget()
{
//The transaction was not committed or rolled back in the file, so we reverse any pos counts we added
@@ -1962,6 +2020,25 @@
jf.decPosCount();
}
}
+
+ private void addTXPosCount(final JournalFile file)
+ {
+ if (transactionPos == null)
+ {
+ transactionPos = new HashSet<JournalFile>();
+ }
+
+ if (!transactionPos.contains(file))
+ {
+ transactionPos.add(file);
+
+ //We add a pos for the transaction itself in the file - this prevents any transactional operations
+ //being deleted before a commit or rollback is written
+ file.incPosCount();
+ }
+ }
+
+
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-07-17 15:34:33 UTC (rev 4689)
@@ -34,6 +34,7 @@
* A TransactionHolder
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
@@ -52,4 +53,6 @@
public boolean prepared;
+ public boolean invalid;
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-17 15:34:33 UTC (rev 4689)
@@ -141,9 +141,16 @@
/* ID */15l,
JournalImpl.SIZE_DELETE_RECORD_TX)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_DELETE_RECORD_TX);
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.PREPARE_RECORD,
+ /*FileID*/1,
+ /* Transaction ID*/ 100l,
+ /* Number of Elements */ 1,
+ JournalImpl.SIZE_PREPARE_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_PREPARE_RECORD);
+
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD,
/*FileID*/1,
/* Transaction ID*/ 100l,
+ /* Number of Elements */ 1,
JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
EasyMock.replay(mockFactory, file1, file2);
@@ -152,6 +159,8 @@
journalImpl.appendDeleteRecordTransactional(100l, 15l);
+ journalImpl.appendPrepareRecord(100l);
+
journalImpl.appendCommitRecord(100l);
EasyMock.verify(mockFactory, file1, file2);
@@ -190,11 +199,13 @@
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.PREPARE_RECORD,
/*FileID*/1,
/* TXID */ 3l,
+ /* Number Of Elements */ 2,
JournalImpl.SIZE_PREPARE_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_PREPARE_RECORD);
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD,
/*FileID*/1,
/* TXID */ 3l,
+ /* Number Of Elements */ 2,
JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
EasyMock.replay(mockFactory, file1, file2);
@@ -228,6 +239,7 @@
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ROLLBACK_RECORD,
/*FileID*/1,
/* TXID */ 3l,
+ /* NumberOfElements */ 1,
JournalImpl.SIZE_ROLLBACK_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ROLLBACK_RECORD);
EasyMock.replay(mockFactory, file1, file2);
@@ -313,6 +325,7 @@
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD,
/*FileID*/1,
/* Transaction ID*/ 33l,
+ /* NumberOfElements*/ 2,
JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
EasyMock.replay(mockFactory, file1, file2);
@@ -446,6 +459,7 @@
public void appendTo(StringBuffer buffer)
{
+ buffer.append("ByteArray");
}
public boolean matches(Object argument)
More information about the jboss-cvs-commits
mailing list