[jboss-cvs] JBoss Messaging SVN: r4933 - in trunk/src/main/org/jboss/messaging/core: persistence/impl/journal and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 11 04:31:24 EDT 2008


Author: timfox
Date: 2008-09-11 04:31:24 -0400 (Thu, 11 Sep 2008)
New Revision: 4933

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
Log:
Some tweaks to journal


Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-11 07:08:19 UTC (rev 4932)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-11 08:31:24 UTC (rev 4933)
@@ -676,7 +676,7 @@
    /** 
     * <p>Load data accordingly to the record layouts</p>
     * 
-    * <p>Basic record laytout:</p>
+    * <p>Basic record layout:</p>
     * <table border=1>
     *   <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
     *   <tr><td>RecordType</td><td>Byte (1)</td></tr>

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-09-11 07:08:19 UTC (rev 4932)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-09-11 08:31:24 UTC (rev 4933)
@@ -47,7 +47,7 @@
    
    public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
    
-   public final ArrayList<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
+   public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
    
    public boolean prepared;
    

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-09-11 07:08:19 UTC (rev 4932)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-09-11 08:31:24 UTC (rev 4933)
@@ -225,7 +225,9 @@
          // Instead of updating the record, we delete the old one as that is better for reclaiming
          messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID(), null);
       }
+      
       pageTransaction.setRecordID(generateID());
+      
       messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), PAGE_TRANSACTION, pageTransaction);
    }
    
@@ -236,7 +238,9 @@
          // To avoid linked list effect on reclaiming, we delete and add a new record, instead of simply updating it
          messageJournal.appendDeleteRecordTransactional(txID, lastPage.getRecordId(), null);
       }
+      
       lastPage.setRecordId(generateID());
+      
       messageJournal.appendAddRecordTransactional(txID, lastPage.getRecordId(), LAST_PAGE, lastPage);
    }
 
@@ -257,7 +261,7 @@
   
    public void prepare(long txID, Xid xid) throws Exception
    {
-   	messageJournal.appendPrepareRecord(txID, new EncodingXid(xid));
+   	messageJournal.appendPrepareRecord(txID, new XidEncoding(xid));
    }
    
    public void commit(long txID) throws Exception
@@ -321,8 +325,8 @@
                long messageID = record.id;
 
                ACKEncoding encoding = new ACKEncoding();
+               
 				   encoding.decode(buff);
-
 					
 					Queue queue = queues.get(encoding.queueID);
 					
@@ -364,12 +368,10 @@
 					
 					reference.setDeliveryCount(deliveryUpdate.count);
 					
-					break;
-					
+					break;					
 				}
             case PAGE_TRANSACTION:
-            {
-               
+            {               
                PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
                
                pageTransactionInfo.decode(buff);
@@ -492,7 +494,9 @@
 			                   final List<Binding> bindings, final List<SimpleString> destinations) throws Exception
 	{
 		List<RecordInfo> records = new ArrayList<RecordInfo>();
+		
 		List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+		
 		long maxID = bindingsJournal.load(records, preparedTransactions);
 
 		for (RecordInfo record: records)
@@ -504,9 +508,9 @@
 			byte rec = record.getUserRecordType();
 			
 			if (rec == BINDING_RECORD)
-			{
-			   
+			{			   
 			   BindingEncoding encodeBinding = new BindingEncoding();
+			   
 			   encodeBinding.decode(buffer);
 			   
 			   Filter filter = null;
@@ -523,9 +527,9 @@
 				bindings.add(binding);      
 			}
 			else if (rec == DESTINATION_RECORD)
-			{
-			   
+			{			   
 			   DestinationEncoding destEnc = new DestinationEncoding();
+			   
 			   destEnc.decode(buffer);
 
 			   destinationIDMap.put(destEnc.destination, id);
@@ -599,12 +603,14 @@
       //recover prepared transactions
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
       {
-         log.trace(preparedTransaction);
-         EncodingXid encodingXid = new EncodingXid(preparedTransaction.extraData);
+         XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+         
          Xid xid = encodingXid.xid;
 
          Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
+         
          List<MessageReference> messages = new ArrayList<MessageReference>();
+         
          List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
          
          PageTransactionInfoImpl pageTransactionInfo = null;
@@ -629,7 +635,9 @@
                   message.decode(buff);
 
                   List<MessageReference> refs = postOffice.route(message);
+                  
                   messages.addAll(refs);
+                  
                   break;
                }
                case ACKNOWLEDGE_REF:
@@ -637,9 +645,9 @@
                   long messageID = record.id;
 
                   ACKEncoding encoding = new ACKEncoding();
+                  
                   encoding.decode(buff);
 
-
                   Queue queue = queues.get(encoding.queueID);
 
                   if (queue == null)
@@ -650,17 +658,22 @@
                   MessageReference removed = queue.removeReferenceWithID(messageID);
 
                   messagesToAck.add(removed);
+                  
                   if (removed == null)
                   {
                      throw new IllegalStateException("Failed to remove reference for " + messageID);
                   }
+                  
                   break;
                }
                case PAGE_TRANSACTION:
                {
                   pageTransactionInfo = new PageTransactionInfoImpl();
+                  
                   pageTransactionInfo.decode(buff);
+                  
                   pageTransactionInfo.markIncomplete();
+                  
                   break;
                }
                default:
@@ -679,9 +692,9 @@
             long messageID = record.id;
 
             DeleteEncoding encoding = new DeleteEncoding();
+            
             encoding.decode(buff);
 
-
             Queue queue = queues.get(encoding.queueID);
 
             if (queue == null)
@@ -692,6 +705,7 @@
             MessageReference removed = queue.removeReferenceWithID(messageID);
 
             messagesToAck.add(removed);
+            
             if (removed == null)
             {
                throw new IllegalStateException("Failed to remove reference for " + messageID);
@@ -700,6 +714,7 @@
          
          //now we recreate the state of the tx and add to the resource manager
          tx.replay(messages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+         
          resourceManager.putTransaction(xid, tx);
       }
    }
@@ -736,22 +751,20 @@
 	
    // Inner Classes ----------------------------------------------------------------------------
 	
-	private static class EncodingXid implements EncodingSupport
-	{
-	   
+	private static class XidEncoding implements EncodingSupport
+	{	   
 	   final Xid xid;
 	   
-	   EncodingXid(Xid xid)
+	   XidEncoding(Xid xid)
 	   {
 	      this.xid = xid;
 	   }
 	   
-	   EncodingXid(byte[] data)
+	   XidEncoding(byte[] data)
 	   {
 	      xid = XidCodecSupport.decodeXid(new ByteBufferWrapper(ByteBuffer.wrap(data)));
 	   }
 	   
-
       public void decode(MessagingBuffer buffer)
       {
          throw new IllegalStateException("Non Supported Operation");
@@ -765,20 +778,17 @@
       public int getEncodeSize()
       {
          return XidCodecSupport.getXidEncodeLength(xid);
-      }
-	   
+      }	  
 	}
 
    private static class BindingEncoding implements EncodingSupport
-   {
-      
+   {      
       SimpleString queueName;
       SimpleString address;
       SimpleString filter;
 
       public BindingEncoding()
-      {
-         
+      {         
       }
       
       public BindingEncoding(SimpleString queueName,
@@ -794,8 +804,7 @@
       {
          queueName = buffer.getSimpleString();
          address = buffer.getSimpleString();
-         filter = buffer.getNullableSimpleString();
-         
+         filter = buffer.getNullableSimpleString();         
       }
 
       public void encode(MessagingBuffer buffer)
@@ -816,7 +825,6 @@
 
    private static class DestinationEncoding implements EncodingSupport
    {
-
       SimpleString destination;
       
       DestinationEncoding(SimpleString destination)
@@ -877,16 +885,13 @@
       public int getEncodeSize()
       {
          return 8 + 4;
-      }
-      
+      }      
    }
    
-   private class QueueEncoding implements EncodingSupport
+   private static class QueueEncoding implements EncodingSupport
    {
       long queueID;
       
-      
-
       public QueueEncoding(long queueID)
       {
          super();
@@ -911,13 +916,11 @@
       public int getEncodeSize()
       {
          return 8;
-      }
-      
+      }      
    }
    
-   private class DeleteEncoding extends QueueEncoding
+   private static class DeleteEncoding extends QueueEncoding
    {
-
       public DeleteEncoding()
       {
          super();
@@ -926,13 +929,11 @@
       public DeleteEncoding(long queueID)
       {
          super(queueID);
-      }
-      
+      }      
    }
    
-   private class ACKEncoding extends QueueEncoding
+   private static class ACKEncoding extends QueueEncoding
    {
-
       public ACKEncoding()
       {
          super();
@@ -942,7 +943,5 @@
       {
          super(queueID);
       }
-   }
-
-   
+   }   
 }




More information about the jboss-cvs-commits mailing list