[jboss-cvs] JBoss Messaging SVN: r4933 - in trunk/src/main/org/jboss/messaging/core: persistence/impl/journal and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 11 04:31:24 EDT 2008
Author: timfox
Date: 2008-09-11 04:31:24 -0400 (Thu, 11 Sep 2008)
New Revision: 4933
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
Log:
Some tweaks to journal
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-11 07:08:19 UTC (rev 4932)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-11 08:31:24 UTC (rev 4933)
@@ -676,7 +676,7 @@
/**
* <p>Load data accordingly to the record layouts</p>
*
- * <p>Basic record laytout:</p>
+ * <p>Basic record layout:</p>
* <table border=1>
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-11 07:08:19 UTC (rev 4932)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-11 08:31:24 UTC (rev 4933)
@@ -47,7 +47,7 @@
public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
- public final ArrayList<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
+ public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public boolean prepared;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-11 07:08:19 UTC (rev 4932)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-11 08:31:24 UTC (rev 4933)
@@ -225,7 +225,9 @@
// Instead of updating the record, we delete the old one as that is better for reclaiming
messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID(), null);
}
+
pageTransaction.setRecordID(generateID());
+
messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
}
@@ -236,7 +238,9 @@
// To avoid linked list effect on reclaiming, we delete and add a new record, instead of simply updating it
messageJournal.appendDeleteRecordTransactional(txID, lastPage.getRecordId(), null);
}
+
lastPage.setRecordId(generateID());
+
messageJournal.appendAddRecordTransactional(txID, lastPage.getRecordId(), LAST_PAGE, lastPage);
}
@@ -257,7 +261,7 @@
public void prepare(long txID, Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new EncodingXid(xid));
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid));
}
public void commit(long txID) throws Exception
@@ -321,8 +325,8 @@
long messageID = record.id;
ACKEncoding encoding = new ACKEncoding();
+
encoding.decode(buff);
-
Queue queue = queues.get(encoding.queueID);
@@ -364,12 +368,10 @@
reference.setDeliveryCount(deliveryUpdate.count);
- break;
-
+ break;
}
case PAGE_TRANSACTION:
- {
-
+ {
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
@@ -492,7 +494,9 @@
final List<Binding> bindings, final List<SimpleString> destinations) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
+
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+
long maxID = bindingsJournal.load(records, preparedTransactions);
for (RecordInfo record: records)
@@ -504,9 +508,9 @@
byte rec = record.getUserRecordType();
if (rec == BINDING_RECORD)
- {
-
+ {
BindingEncoding encodeBinding = new BindingEncoding();
+
encodeBinding.decode(buffer);
Filter filter = null;
@@ -523,9 +527,9 @@
bindings.add(binding);
}
else if (rec == DESTINATION_RECORD)
- {
-
+ {
DestinationEncoding destEnc = new DestinationEncoding();
+
destEnc.decode(buffer);
destinationIDMap.put(destEnc.destination, id);
@@ -599,12 +603,14 @@
//recover prepared transactions
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
{
- log.trace(preparedTransaction);
- EncodingXid encodingXid = new EncodingXid(preparedTransaction.extraData);
+ XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+
Xid xid = encodingXid.xid;
Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
+
List<MessageReference> messages = new ArrayList<MessageReference>();
+
List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
PageTransactionInfoImpl pageTransactionInfo = null;
@@ -629,7 +635,9 @@
message.decode(buff);
List<MessageReference> refs = postOffice.route(message);
+
messages.addAll(refs);
+
break;
}
case ACKNOWLEDGE_REF:
@@ -637,9 +645,9 @@
long messageID = record.id;
ACKEncoding encoding = new ACKEncoding();
+
encoding.decode(buff);
-
Queue queue = queues.get(encoding.queueID);
if (queue == null)
@@ -650,17 +658,22 @@
MessageReference removed = queue.removeReferenceWithID(messageID);
messagesToAck.add(removed);
+
if (removed == null)
{
throw new IllegalStateException("Failed to remove reference for " + messageID);
}
+
break;
}
case PAGE_TRANSACTION:
{
pageTransactionInfo = new PageTransactionInfoImpl();
+
pageTransactionInfo.decode(buff);
+
pageTransactionInfo.markIncomplete();
+
break;
}
default:
@@ -679,9 +692,9 @@
long messageID = record.id;
DeleteEncoding encoding = new DeleteEncoding();
+
encoding.decode(buff);
-
Queue queue = queues.get(encoding.queueID);
if (queue == null)
@@ -692,6 +705,7 @@
MessageReference removed = queue.removeReferenceWithID(messageID);
messagesToAck.add(removed);
+
if (removed == null)
{
throw new IllegalStateException("Failed to remove reference for " + messageID);
@@ -700,6 +714,7 @@
//now we recreate the state of the tx and add to the resource manager
tx.replay(messages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+
resourceManager.putTransaction(xid, tx);
}
}
@@ -736,22 +751,20 @@
// Inner Classes ----------------------------------------------------------------------------
- private static class EncodingXid implements EncodingSupport
- {
-
+ private static class XidEncoding implements EncodingSupport
+ {
final Xid xid;
- EncodingXid(Xid xid)
+ XidEncoding(Xid xid)
{
this.xid = xid;
}
- EncodingXid(byte[] data)
+ XidEncoding(byte[] data)
{
xid = XidCodecSupport.decodeXid(new ByteBufferWrapper(ByteBuffer.wrap(data)));
}
-
public void decode(MessagingBuffer buffer)
{
throw new IllegalStateException("Non Supported Operation");
@@ -765,20 +778,17 @@
public int getEncodeSize()
{
return XidCodecSupport.getXidEncodeLength(xid);
- }
-
+ }
}
private static class BindingEncoding implements EncodingSupport
- {
-
+ {
SimpleString queueName;
SimpleString address;
SimpleString filter;
public BindingEncoding()
- {
-
+ {
}
public BindingEncoding(SimpleString queueName,
@@ -794,8 +804,7 @@
{
queueName = buffer.getSimpleString();
address = buffer.getSimpleString();
- filter = buffer.getNullableSimpleString();
-
+ filter = buffer.getNullableSimpleString();
}
public void encode(MessagingBuffer buffer)
@@ -816,7 +825,6 @@
private static class DestinationEncoding implements EncodingSupport
{
-
SimpleString destination;
DestinationEncoding(SimpleString destination)
@@ -877,16 +885,13 @@
public int getEncodeSize()
{
return 8 + 4;
- }
-
+ }
}
- private class QueueEncoding implements EncodingSupport
+ private static class QueueEncoding implements EncodingSupport
{
long queueID;
-
-
public QueueEncoding(long queueID)
{
super();
@@ -911,13 +916,11 @@
public int getEncodeSize()
{
return 8;
- }
-
+ }
}
- private class DeleteEncoding extends QueueEncoding
+ private static class DeleteEncoding extends QueueEncoding
{
-
public DeleteEncoding()
{
super();
@@ -926,13 +929,11 @@
public DeleteEncoding(long queueID)
{
super(queueID);
- }
-
+ }
}
- private class ACKEncoding extends QueueEncoding
+ private static class ACKEncoding extends QueueEncoding
{
-
public ACKEncoding()
{
super();
@@ -942,7 +943,5 @@
{
super(queueID);
}
- }
-
-
+ }
}
More information about the jboss-cvs-commits
mailing list