[jboss-cvs] JBoss Messaging SVN: r4914 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 5 16:41:22 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-09-05 16:41:22 -0400 (Fri, 05 Sep 2008)
New Revision: 4914
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1299 - some little tidyup on XA
(removing the dependency between Xid and Journal, moving Encoding & Decoding of XIDs to JournalStorageManager)
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -64,7 +64,7 @@
void appendCommitRecord(long txID) throws Exception;
- void appendPrepareRecord(long txID, Xid xid) throws Exception;
+ void appendPrepareRecord(long txID, EncodingSupport transactionIdentifier) throws Exception;
void appendRollbackRecord(long txID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -23,7 +23,6 @@
package org.jboss.messaging.core.journal;
-import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -39,15 +38,15 @@
public class PreparedTransactionInfo
{
public final long id;
- public final Xid xid;
+ public final byte[] xidData;
public final List<RecordInfo> records = new ArrayList<RecordInfo>();
public final Set<Long> recordsToDelete = new HashSet<Long>();
- public PreparedTransactionInfo(final long id, final Xid xid)
+ public PreparedTransactionInfo(final long id, final byte[] xidData)
{
this.id = id;
- this.xid = xid;
+ this.xidData = xidData;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -26,12 +26,9 @@
import org.jboss.messaging.core.journal.*;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.XidCodecSupport;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.VariableLatch;
-import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
@@ -627,7 +624,7 @@
}
}
- public void appendPrepareRecord(final long txID, Xid xid) throws Exception
+ public void appendPrepareRecord(final long txID, EncodingSupport xid) throws Exception
{
if (state != STATE_LOADED)
{
@@ -857,6 +854,8 @@
// The variable record portion used on Updates and Appends
int variableSize = 0;
+ // Used to hold XIDs on PrepareTransactions
+ int preparedTransactionDataSize = 0;
byte userRecordType = 0;
byte record[] = null;
@@ -884,25 +883,25 @@
{
if(recordType == PREPARE_RECORD)
{
- variableSize = bb.getInt();
+ preparedTransactionDataSize = bb.getInt();
}
variableSize += bb.getInt() * SIZE_INT * 2;
}
int recordSize = getRecordSize(recordType);
- if (pos + recordSize + variableSize > fileSize)
+ if (pos + recordSize + variableSize + preparedTransactionDataSize > fileSize)
{
continue;
}
int oldPos = bb.position();
- bb.position(pos + variableSize + recordSize - SIZE_INT);
+ bb.position(pos + variableSize + recordSize + preparedTransactionDataSize - SIZE_INT);
int checkSize = bb.getInt();
- if (checkSize != variableSize + recordSize)
+ if (checkSize != variableSize + recordSize + preparedTransactionDataSize)
{
log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
// If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
@@ -1017,20 +1016,17 @@
TransactionHolder tx = transactions.get(transactionID);
// We need to read it even if transaction was not found, or the reading checks would fail
+
+ byte xidData[] = new byte[preparedTransactionDataSize];
+ bb.get(xidData);
+
// Pair <OrderId, NumberOfElements>
- Xid xid = null;
- int formatID = bb.getInt();
- byte[] bq = new byte[bb.getInt()];
- bb.get(bq);
- byte[] gtxid = new byte[bb.getInt()];
- bb.get(gtxid);
- xid = new XidImpl(bq, formatID, gtxid);
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize - XidCodecSupport.getXidEncodeLength(xid), bb);
+ Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
if (tx != null)
{
tx.prepared = true;
- tx.xid = xid;
+ tx.xidData = xidData;
JournalTransaction journalTransaction = transactionInfos.get(transactionID);
if (journalTransaction == null)
@@ -1134,7 +1130,7 @@
checkSize = bb.getInt();
- if (checkSize != variableSize + recordSize)
+ if (checkSize != variableSize + recordSize + preparedTransactionDataSize)
{
throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
}
@@ -1227,7 +1223,7 @@
}
else
{
- PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.xid);
+ PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.xidData);
info.records.addAll(transaction.recordInfos);
@@ -1604,9 +1600,9 @@
return bb;
}
- private ByteBuffer writePrepareTransaction(final byte recordType, final long txID, final JournalTransaction tx, Xid xid) throws Exception
+ private ByteBuffer writePrepareTransaction(final byte recordType, final long txID, final JournalTransaction tx, EncodingSupport xid) throws Exception
{
- int xidSize = XidCodecSupport.getXidEncodeLength(xid);
+ int xidSize = xid.getEncodeSize();
int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2 + xidSize + SIZE_INT;
ByteBuffer bb = fileFactory.newBuffer(size);
@@ -1616,7 +1612,7 @@
bb.putLong(txID);
bb.putInt(xidSize);
bb.putInt(tx.getElementsSummary().size());
- XidCodecSupport.encodeXid(xid, new ByteBufferWrapper(bb));
+ xid.encode(new ByteBufferWrapper(bb));
for (Map.Entry<Integer, AtomicInteger> entry: tx.getElementsSummary().entrySet())
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -56,6 +56,6 @@
public boolean invalid;
- public Xid xid;
+ public byte[] xidData;
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.BindingImpl;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.XidCodecSupport;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -255,7 +256,7 @@
public void prepare(long txID, Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, xid);
+ messageJournal.appendPrepareRecord(txID, new EncodingXid(xid));
}
public void commit(long txID) throws Exception
@@ -302,7 +303,10 @@
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
{
log.trace(preparedTransaction);
- Transaction tx = new TransactionImpl(preparedTransaction.id, preparedTransaction.xid, this, postOffice);
+ EncodingXid encodingXid = new EncodingXid(preparedTransaction.xidData);
+ Xid xid = encodingXid.xid;
+
+ Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
List<ServerMessage> messages = new ArrayList<ServerMessage>();
List<ServerMessage> messagesToDelete = new ArrayList<ServerMessage>();
//first get any sent messages for this tx and recreate
@@ -345,7 +349,7 @@
}
//now we recreate the state of the tx and add to th erresource manager
tx.replay(messages, messagesToDelete, Transaction.State.PREPARED);
- resourceManager.putTransaction(preparedTransaction.xid, tx);
+ resourceManager.putTransaction(xid, tx);
//and finally since we've dealt with the records we don't need to process them.
for (RecordInfo recordInfo : recordsToDelete)
{
@@ -765,5 +769,38 @@
}
// Inner Classes ----------------------------------------------------------------------------
+
+ private static class EncodingXid implements EncodingSupport
+ {
+
+ final Xid xid;
+
+ EncodingXid(Xid xid)
+ {
+ this.xid = xid;
+ }
+
+ EncodingXid(byte[] data)
+ {
+ xid = XidCodecSupport.decodeXid(new ByteBufferWrapper(ByteBuffer.wrap(data)));
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ throw new IllegalStateException("Non Supported Operation");
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ XidCodecSupport.encodeXid(xid, buffer);
+ }
+
+ public int getEncodeSize()
+ {
+ return XidCodecSupport.getXidEncodeLength(xid);
+ }
+
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -812,9 +812,9 @@
}
journalImpl.forceMoveNextFile();
-
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
- journalImpl.appendPrepareRecord(1l, xid);
+ SimpleEncoding xidEncoding = new SimpleEncoding(10, (byte)'a');
+
+ journalImpl.appendPrepareRecord(1l, xidEncoding);
journalImpl.appendCommitRecord(1l);
for (int i=0;i<10;i++)
@@ -872,6 +872,40 @@
}
+ public void testSimplePrepare() throws Exception
+ {
+ final int JOURNAL_SIZE = 3 * 1024;
+
+ setupJournal(JOURNAL_SIZE, 1);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(50,(byte) 1));
+
+ SimpleEncoding xid = new SimpleEncoding(10, (byte)1);
+
+ journalImpl.appendPrepareRecord(1, xid);
+
+ journalImpl.appendAddRecord(2l, (byte)1, new SimpleEncoding(10, (byte)1));
+
+ journalImpl.debugWait();
+
+ setupJournal(JOURNAL_SIZE, 1);
+
+ assertEquals(1, transactions.size());
+ assertEquals(1, records.size());
+
+ assertEquals(10, transactions.get(0).xidData.length);
+
+ for (int i = 0; i < 10; i++)
+ {
+ assertEquals((byte)1, transactions.get(0).xidData[i]);
+ }
+
+
+ }
+
public void testReloadWithPreparedTransaction() throws Exception
{
@@ -890,9 +924,10 @@
journalImpl.debugWait();
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
- journalImpl.appendPrepareRecord(1l, xid);
+ SimpleEncoding xid1 = new SimpleEncoding(10, (byte)1);
+ journalImpl.appendPrepareRecord(1l, xid1);
+
assertEquals(12, factory.listFiles("tt").size());
setupJournal(JOURNAL_SIZE, 1024);
@@ -900,6 +935,12 @@
assertEquals(0, records.size());
assertEquals(1, transactions.size());
+ assertEquals(10, transactions.get(0).xidData.length);
+ for (int i = 0; i < 10; i++)
+ {
+ assertEquals((byte)1, transactions.get(0).xidData[i]);
+ }
+
journalImpl.checkAndReclaimFiles();
assertEquals(10, journalImpl.getDataFilesCount());
@@ -919,10 +960,21 @@
journalImpl.appendDeleteRecordTransactional(2l, (long)i);
}
- journalImpl.appendPrepareRecord(2l, xid);
+ SimpleEncoding xid2 = new SimpleEncoding(15, (byte)2);
+
+ journalImpl.appendPrepareRecord(2l, xid2);
setupJournal(JOURNAL_SIZE, 1);
+ assertEquals(1, transactions.size());
+
+ assertEquals(15, transactions.get(0).xidData.length);
+
+ for (int i = 0; i < transactions.get(0).xidData.length; i++)
+ {
+ assertEquals(2, transactions.get(0).xidData[i]);
+ }
+
assertEquals(10, journalImpl.getDataFilesCount());
assertEquals(12, factory.listFiles("tt").size());
@@ -959,8 +1011,7 @@
journalImpl.forceMoveNextFile();
}
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
- journalImpl.appendPrepareRecord(1l, xid);
+ journalImpl.appendPrepareRecord(1l, new SimpleEncoding(13, (byte)0));
setupJournal(JOURNAL_SIZE, 100);
assertEquals(0, records.size());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.unit.core.journal.impl;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -288,7 +289,7 @@
journal.debugWait();
}
- protected void prepare(long txID, Xid xid) throws Exception
+ protected void prepare(long txID, EncodingSupport xid) throws Exception
{
TransactionHolder tx = transactions.get(txID);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -22,10 +22,12 @@
package org.jboss.messaging.tests.unit.core.journal.impl;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import javax.transaction.xa.Xid;
import java.util.List;
@@ -1600,7 +1602,7 @@
assertEquals(1, journal.getOpenedFilesCount());
assertEquals(1, journal.getIDMapSize());
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
prepare(1, xid); // in file 1
List<String> files3 = fileFactory.listFiles(fileExtension);
@@ -1740,7 +1742,7 @@
assertEquals(0, journal.getFreeFilesCount());
assertEquals(1, journal.getIDMapSize());
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
prepare(1, xid); // in file 1
List<String> files3 = fileFactory.listFiles(fileExtension);
@@ -2622,7 +2624,8 @@
updateTx(1, 1, 2, 3, 4, 7, 8);
deleteTx(1, 1, 2, 3, 4, 5);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(1, xid);
stopJournal();
createJournal();
@@ -2639,7 +2642,9 @@
addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
updateTx(1, 1, 2,3, 4, 7, 8);
deleteTx(1, 1, 2, 3, 4, 5);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(1, xid);
commit(1);
stopJournal();
@@ -2657,7 +2662,9 @@
addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
updateTx(1, 1, 2,3, 4, 7, 8);
deleteTx(1, 1, 2, 3, 4, 5);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(1, xid);
rollback(1);
stopJournal();
@@ -2692,7 +2699,9 @@
addTx(1, 7, 8, 9, 10);
updateTx(1, 1, 2, 3, 7, 8, 9);
deleteTx(1, 1, 2, 3, 4, 5);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(1, xid);
stopJournal();
createJournal();
@@ -2710,7 +2719,9 @@
addTx(1, 7, 8, 9, 10);
updateTx(1, 1, 2, 3, 7, 8, 9);
deleteTx(1, 1, 2, 3, 4, 5);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(1, xid);
rollback(1);
stopJournal();
@@ -2729,7 +2740,9 @@
addTx(1, 7, 8, 9, 10);
updateTx(1, 1, 2, 3, 7, 8, 9);
deleteTx(1, 1, 2, 3, 4, 5);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(1, xid);
commit(1);
stopJournal();
@@ -2751,7 +2764,9 @@
addTx(3, 28, 29, 30, 31, 32, 33, 34, 35);
updateTx(3, 7, 8, 9, 10);
deleteTx(2, 4, 5, 6, 23, 25, 27);
- Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+
prepare(2, xid);
deleteTx(1, 1, 2, 11, 14, 15);
prepare(1, xid);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-09-04 22:23:32 UTC (rev 4913)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-09-05 20:41:22 UTC (rev 4914)
@@ -179,8 +179,10 @@
final long txID = 1209373;
Xid xid = new XidImpl("branch".getBytes(), 1, "globalid".getBytes());
- messageJournal.appendPrepareRecord(txID, xid);
+
+ messageJournal.appendPrepareRecord(EasyMock.eq(txID), EasyMock.isA(EncodingSupport.class));
EasyMock.replay(messageJournal, bindingsJournal);
+
jsm.prepare(txID, xid);
EasyMock.verify(messageJournal, bindingsJournal);
}
More information about the jboss-cvs-commits
mailing list