[jboss-cvs] JBoss Messaging SVN: r4756 - trunk/src/main/org/jboss/messaging/core/journal/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 31 05:41:34 EDT 2008


Author: timfox
Date: 2008-07-31 05:41:33 -0400 (Thu, 31 Jul 2008)
New Revision: 4756

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
Mainly cosmetics and adding final modifier


Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-31 09:30:40 UTC (rev 4755)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-31 09:41:33 UTC (rev 4756)
@@ -91,8 +91,7 @@
    private static final int SIZE_BYTE = 1;
    
    public static final int MIN_FILE_SIZE = 1024;
-   
-   
+      
    public static final int SIZE_HEADER = 4;
 
    //Record markers - they must be all unique
@@ -191,7 +190,7 @@
    
    private final AtomicLong transactionIDSequence = new AtomicLong(0);
    
-   private Reclaimer reclaimer = new Reclaimer();
+   private final Reclaimer reclaimer = new Reclaimer();
    
    // Static --------------------------------------------------------
    
@@ -202,16 +201,16 @@
    // This method exists just to make debug easier.
    // I could replace log.trace by log.info temporarily while I was debugging Journal 
    private static final void trace(String message)
-   {
+   {      
       log.trace(message);
    }
    
    // Constructors --------------------------------------------------
    
    public JournalImpl(final int fileSize, final int minFiles,
-         final boolean syncTransactional, final boolean syncNonTransactional,
-         final SequentialFileFactory fileFactory, 
-         final String filePrefix, final String fileExtension, final int maxAIO)
+                      final boolean syncTransactional, final boolean syncNonTransactional,
+                      final SequentialFileFactory fileFactory, 
+                      final String filePrefix, final String fileExtension, final int maxAIO)
    {
       if (fileSize < MIN_FILE_SIZE)
       {
@@ -303,8 +302,7 @@
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
-      
-      
+            
       int size = SIZE_ADD_RECORD + record.length;
       
       ByteBuffer bb = fileFactory.newBuffer(size);
@@ -317,8 +315,7 @@
       bb.put(record);		
       bb.putInt(size);			
       bb.rewind();
-      
-      
+           
       try
       {                 
          lock.acquire();
@@ -361,10 +358,9 @@
       bb.putInt(size);     
       bb.rewind();
       
+      lock.acquire();
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
          
          posFiles.addUpdateFile(usedFile);
@@ -402,10 +398,10 @@
       bb.putInt(size);     
       bb.rewind();
       
-      try
-      {                 
-         lock.acquire();
+      lock.acquire();
 
+      try
+      {                          
          JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
          
          posFiles.addUpdateFile(usedFile);
@@ -416,7 +412,7 @@
       }
    }
    
-   public void appendDeleteRecord(long id) throws Exception
+   public void appendDeleteRecord(final long id) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -440,10 +436,10 @@
       bb.putInt(size);     
       bb.rewind();
       
+      lock.acquire();
+      
       try
-      {                 
-         lock.acquire();
-         
+      {                          
          JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
          
          posFiles.addDelete(usedFile);
@@ -483,16 +479,15 @@
       bb.putInt(size);     
       bb.rewind();
       
+      lock.acquire();
+      
       try
-      {                 
-         lock.acquire();
-         
+      {                          
          JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
          
          JournalTransaction tx = getTransactionInfo(txID);
          
-         tx.addPositive(usedFile, id);
-         
+         tx.addPositive(usedFile, id);         
       }
       finally
       {
@@ -521,16 +516,15 @@
       bb.putInt(size);
       bb.rewind();
       
-      try
-      {                 
-         lock.acquire();
+      lock.acquire();
 
+      try
+      {                          
          JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
          
          JournalTransaction tx = getTransactionInfo(txID);
          
          tx.addPositive(usedFile, id);
-
       }
       finally
       {
@@ -559,16 +553,15 @@
       bb.putInt(size);     
       bb.rewind();
       
+      lock.acquire();
+      
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
          
          JournalTransaction tx = getTransactionInfo(txID);
          
          tx.addPositive(usedFile, id);
-
       }
       finally
       {
@@ -586,8 +579,7 @@
       int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize(); 
       
       ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size)); 
-      
-      
+            
       bb.putByte(UPDATE_RECORD_TX);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
       bb.putLong(txID);
@@ -598,16 +590,15 @@
       bb.putInt(size);     
       bb.rewind();
       
+      lock.acquire();
+      
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
          
          JournalTransaction tx = getTransactionInfo(txID);
          
          tx.addPositive(usedFile, id);
-
       }
       finally
       {
@@ -633,16 +624,15 @@
       bb.putInt(size);     
       bb.rewind();
       
+      lock.acquire();
+      
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
          
          JournalTransaction tx = getTransactionInfo(txID);
          
          tx.addNegative(usedFile, id);      
-
       }
       finally
       {
@@ -666,15 +656,13 @@
       
       ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx);
       
+      lock.acquire();
       
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
          
          tx.prepare(usedFile);
-
       }
       finally
       {
@@ -698,26 +686,20 @@
       
       ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx);
       
+      lock.acquire();
       
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
          
          transactionCallbacks.remove(txID);
          
          tx.commit(usedFile);
-
       }
       finally
       {
          lock.release();
       }
-      
-      
-      
-      
    }
    
    public void appendRollbackRecord(final long txID) throws Exception
@@ -744,16 +726,15 @@
       bb.putInt(size);        
       bb.rewind();
       
+      lock.acquire();
+      
       try
-      {                 
-         lock.acquire();
-
+      {                          
          JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
          
          transactionCallbacks.remove(txID);
          
          tx.rollback(usedFile);
-
       }
       finally
       {
@@ -762,18 +743,14 @@
    }
    
    public synchronized long load(final List<RecordInfo> committedRecords,
-         final List<PreparedTransactionInfo> preparedTransactions) throws Exception
-   {
-      
+                                 final List<PreparedTransactionInfo> preparedTransactions) throws Exception
+   {      
       final Set<Long> recordsToDelete = new HashSet<Long>();
       final List<RecordInfo> records = new ArrayList<RecordInfo>();
-
       
-      long maxID =  load (new LoadManager()
+      long maxID = load (new LoadManager()
       {
-
-         public void addPreparedTransaction(
-               PreparedTransactionInfo preparedTransaction)
+         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
          {
             preparedTransactions.add(preparedTransaction);
          }
@@ -791,11 +768,9 @@
          public void deleteRecord(long id)
          {
             recordsToDelete.add(id);
-         }
-         
+         }        
       });
-      
-      
+            
       for (RecordInfo record: records)
       {
          if (!recordsToDelete.contains(record.id))
@@ -807,9 +782,8 @@
       return maxID;
    }
    
-   public synchronized long load (LoadManager loadManager) throws Exception
-   {
-      
+   public synchronized long load(final LoadManager loadManager) throws Exception
+   {      
       if (state != STATE_STARTED)
       {
          throw new IllegalStateException("Journal must be in started state");
@@ -835,10 +809,8 @@
          
          if (bytesRead != fileSize)
          {
-            //deal with this better
-            
             throw new IllegalStateException("File is wrong size " + bytesRead +
-                  " expected " + fileSize + " : " + file.getFile().getFileName());
+                                            " expected " + fileSize + " : " + file.getFile().getFileName());
          }
 
          //First long is the ordering timestamp, we just jump its position
@@ -1058,8 +1030,7 @@
                   Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
 
                   if (tx != null)
-                  {
-                     
+                  {                     
                      tx.prepared = true;
                      
                      JournalTransaction journalTransaction = transactionInfos.get(transactionID);
@@ -1068,11 +1039,9 @@
                      {
                         throw new IllegalStateException("Cannot find tx " + transactionID);
                      }
+                                          
+                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
                      
-                     
-                     boolean healthy = checkTransactionHealth(
-                           journalTransaction, orderedFiles, values);
-                     
                      if (healthy)
                      {
                         journalTransaction.prepare(file);
@@ -1098,8 +1067,7 @@
                   Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
 
                   if (tx != null)
-                  {
-                     
+                  {                     
                      JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
                      
                      if (journalTransaction == null)
@@ -1107,10 +1075,8 @@
                         throw new IllegalStateException("Cannot find tx " + transactionID);
                      }
 
-                     boolean healthy = checkTransactionHealth(
-                           journalTransaction, orderedFiles, values);
-                     
-                     
+                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
+                                          
                      if (healthy)
                      {
                         for (RecordInfo txRecord: tx.recordInfos)
@@ -1165,7 +1131,7 @@
                default:                
                {
                   throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-                        " is corrupt, invalid record type " + recordType);
+                                                  " is corrupt, invalid record type " + recordType);
                }
             }
             
@@ -1467,7 +1433,6 @@
       this.autoReclaim = false;
    }
    
-
    // MessagingComponent implementation ---------------------------------------------------
    
    public synchronized boolean isStarted()
@@ -1485,7 +1450,6 @@
       this.filesExecutor =  Executors.newSingleThreadExecutor();
       
       state = STATE_STARTED;
-
    }
    
    public synchronized void stop() throws Exception
@@ -1527,7 +1491,7 @@
    // Private -----------------------------------------------------------------------------
 
    // Discard the old JournalFile and set it with a new ID
-   private JournalFile reinitializeFile(JournalFile file) throws Exception
+   private JournalFile reinitializeFile(final JournalFile file) throws Exception
    {
       int newOrderingID = generateOrderingID();
       
@@ -1548,35 +1512,42 @@
       jf.setOffset(bytesWritten);
       
       sf.close();
+      
       return jf;
    }
    
    @SuppressWarnings("unchecked")
-   private Pair<Integer, Integer>[] readReferencesOnTransaction(int variableSize, ByteBuffer bb)
+   private Pair<Integer, Integer>[] readReferencesOnTransaction(final int variableSize, final ByteBuffer bb)
    {
       int numberOfFiles = variableSize / (SIZE_INT * 2);
+      
       Pair<Integer, Integer> values[] = (Pair<Integer, Integer> [])new Pair[numberOfFiles];
+      
       for (int i = 0; i < numberOfFiles; i++)
       {
          values[i] = new Pair(bb.getInt(), bb.getInt());
       }
+      
       return values;
    }
 
-   private boolean checkTransactionHealth(
-         JournalTransaction journalTransaction, List<JournalFile> orderedFiles,
-         Pair<Integer, Integer>[] readReferences)
+   private boolean checkTransactionHealth(final JournalTransaction journalTransaction,
+                                          final List<JournalFile> orderedFiles,
+                                          final Pair<Integer, Integer>[] readReferences)
    {
       boolean healthy = true;
+      
       Map<Integer, AtomicInteger> refMap = journalTransaction.getElementsSummary();
       
       for (Pair<Integer, Integer> ref: readReferences)
       {
          AtomicInteger counter = refMap.get(ref.a);
+         
          if (counter == null)
          {
             // Couldn't find the counter, but if part of the transaction was reclaimed it is ok!
             boolean found = false;
+            
             for (JournalFile lookupFile: orderedFiles)
             {
                if (lookupFile.getOrderingID() == ref.a)
@@ -1643,7 +1614,7 @@
       return recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX;
    }
    
-   private int getRecordSize(byte recordType)
+   private int getRecordSize(final byte recordType)
    {
       // The record size (without the variable portion)
       int recordSize = 0;
@@ -1733,8 +1704,7 @@
     * You need to call lock.acquire before calling this method
     * */
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
-   {
-      
+   {      
       int size = bb.capacity();
       checkFile(size);
       bb.position(SIZE_BYTE);
@@ -1760,7 +1730,7 @@
       return currentFile;
    }
    
-   private JournalFile createFile(boolean keepOpened) throws Exception
+   private JournalFile createFile(final boolean keepOpened) throws Exception
    {
       int orderingID = generateOrderingID();
       
@@ -1794,7 +1764,7 @@
       return info;
    }
    
-   private void openFile(JournalFile file) throws Exception
+   private void openFile(final JournalFile file) throws Exception
    {
       file.getFile().open();
       file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
@@ -1838,6 +1808,7 @@
    private JournalFile enqueueOpenFile() throws InterruptedException
    {
       if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+      
       filesExecutor.execute(new Runnable()
       {
          public void run()
@@ -1852,6 +1823,7 @@
             }
          }
       });
+      
       if (autoReclaim)
       {
          filesExecutor.execute(new Runnable()
@@ -1880,12 +1852,10 @@
             log.warn("Couldn't open a file in 60 Seconds", new Exception ("Warning: Couldn't open a file in 60 Seconds"));
          }
       }
-      
-      
+            
       return nextFile;
    }
-   
-   
+      
    /** 
     * 
     * Open a file and place it into the openedFiles queue
@@ -1912,8 +1882,7 @@
       
       openedFiles.offer(nextOpenedFile);
    }
-   
-   
+      
    private void closeFile(final JournalFile file)
    {
       this.filesExecutor.execute(new Runnable() { public void run()
@@ -2072,7 +2041,6 @@
       {
          return this.invalid;
       }
-
       
       public Map<Integer, AtomicInteger> getElementsSummary()
       {
@@ -2151,7 +2119,7 @@
          }        
       }
       
-      public void rollback(JournalFile file)
+      public void rollback(final JournalFile file)
       {     
          //Now add negs for the pos we added in each file in which there were transactional operations
          //Note that we do this on rollback as we do on commit, since we need to ensure the file containing
@@ -2165,7 +2133,7 @@
          }
       }
       
-      public void prepare(JournalFile file)
+      public void prepare(final JournalFile file)
       {
          //We don't want the prepare record getting deleted before time
          
@@ -2199,20 +2167,18 @@
          }  
       }
       
-      private AtomicInteger getCounter(JournalFile file)
+      private AtomicInteger getCounter(final JournalFile file)
       {
          AtomicInteger value = numberOfElements.get(file.getOrderingID());
          
          if (value == null)
          {
             value = new AtomicInteger();
-            numberOfElements.put(file.getOrderingID(), value);
-            
+            numberOfElements.put(file.getOrderingID(), value);            
          }
          
          return value;
       }
       
    }
-
 }




More information about the jboss-cvs-commits mailing list