[jboss-cvs] JBoss Messaging SVN: r4671 - trunk/src/main/org/jboss/messaging/core/journal/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 8 16:44:28 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-08 16:44:28 -0400 (Tue, 08 Jul 2008)
New Revision: 4671

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
JBMESSAGING-1347 - Revisiting signatures on AIO/NIO and syncs.

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-08 19:43:02 UTC (rev 4670)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-08 20:44:28 UTC (rev 4671)
@@ -285,7 +285,7 @@
       bb.putByte(DONE);        
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional);
+      JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
      
       posFilesMap.put(id, new PosFiles(usedFile));
    }
@@ -309,7 +309,7 @@
 		bb.put(DONE);			
 		bb.rewind();
 		
-      JournalFile usedFile = appendRecord(bb, syncNonTransactional);
+      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
 		
 		posFilesMap.put(id, new PosFiles(usedFile));
 	}
@@ -340,7 +340,7 @@
 		bb.put(DONE);     
 		bb.rewind();
 		   
-      JournalFile usedFile = appendRecord(bb, syncNonTransactional);
+      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
       		
 		posFiles.addUpdateFile(usedFile);
 	}
@@ -370,7 +370,7 @@
 		bb.put(DONE);     
 		bb.rewind();
 		
-      appendRecord(bb, syncNonTransactional);      
+      appendRecord(bb, syncNonTransactional, null);      
 	}     
 	
 	public long getTransactionID()
@@ -403,16 +403,7 @@
       
       JournalFile usedFile;
 
-      if (fileFactory.isSupportsCallbacks() && syncTransactional)
-      {
-         TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb.getBuffer(), callback);
-      }
-      else
-      {
-         usedFile = appendRecord(bb.getBuffer(), false);
-      }
+      usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
       
       TransactionNegPos tx = getTransactionInfo(txID);
       
@@ -441,16 +432,7 @@
 	   
 	   JournalFile usedFile;
 	   
-	   if (fileFactory.isSupportsCallbacks() && syncTransactional)
-      {
-         TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb, callback);
-      }
-      else
-      {
-         usedFile = appendRecord(bb, false);
-      }
+      usedFile = appendRecord(bb, false, getTransactionCallback(txID));
 	   
 	   TransactionNegPos tx = getTransactionInfo(txID);
 	   
@@ -479,16 +461,7 @@
 		
       JournalFile usedFile;
       
-      if (fileFactory.isSupportsCallbacks() && syncTransactional)
-      {
-         TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb, callback);
-      }
-      else
-      {
-         usedFile = appendRecord(bb, false);
-      }
+      usedFile = appendRecord(bb, false, getTransactionCallback(txID));
 		
       TransactionNegPos tx = getTransactionInfo(txID);
       
@@ -514,17 +487,8 @@
 		
       JournalFile usedFile;
       
-      if (fileFactory.isSupportsCallbacks() && syncTransactional)
-      {
-         TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb, callback);
-      }
-      else
-      {
-         usedFile = appendRecord(bb, false);
-      }
-      
+      usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+
       TransactionNegPos tx = getTransactionInfo(txID);
 		
 		tx.addNeg(usedFile, id);      
@@ -555,17 +519,7 @@
 							
 		JournalFile usedFile;
       
-      if (fileFactory.isSupportsCallbacks() && syncTransactional)
-      {
-         TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb, callback);
-         callback.waitCompletion(aioTimeout);     
-      }
-      else
-      {
-         usedFile = appendRecord(bb, syncTransactional);
-      }
+      usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
       
 		tx.prepare(usedFile);
 	}
@@ -595,17 +549,7 @@
 		
 		JournalFile usedFile;
       
-      if (fileFactory.isSupportsCallbacks() && syncTransactional)
-      {
-         TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb, callback);
-         callback.waitCompletion(aioTimeout);
-      }
-      else
-      {
-         usedFile = appendRecord(bb, syncTransactional);
-      }
+      usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
 		
 		transactionCallbacks.remove(txID);
 		
@@ -637,17 +581,8 @@
 		bb.rewind();
 		
 		JournalFile usedFile;
-		if (fileFactory.isSupportsCallbacks() && syncTransactional)
-		{
-		   TransactionCallback callback = getTransactionCallback(txID);
-         callback.countUp();
-         usedFile = appendRecord(bb, callback);
-         callback.waitCompletion(aioTimeout);
-		}
-		else
-		{
-		   usedFile = appendRecord(bb, syncTransactional);      
-		}
+
+		usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
 		
 		transactionCallbacks.remove(txID);
 				
@@ -1484,8 +1419,8 @@
 	// Public -----------------------------------------------------------------------------
 	
 	// Private -----------------------------------------------------------------------------
-	
-	private JournalFile appendRecord(final ByteBuffer bb, final boolean sync) throws Exception
+
+	private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
 	{
 		lock.acquire();
 		
@@ -1494,7 +1429,18 @@
 		try
 		{                 
 			checkFile(size);
-			currentFile.getFile().write(bb, sync);       
+			if (callback != null)
+			{
+			   currentFile.getFile().write(bb, callback);
+			   if (sync)
+			   {
+			      callback.waitCompletion(aioTimeout);
+			   }
+			}
+			else
+			{
+	         currentFile.getFile().write(bb, sync);       
+			}
 			currentFile.extendOffset(size);
 			return currentFile;
 		}
@@ -1504,25 +1450,6 @@
 		}
 	}
 	
-	private JournalFile appendRecord(final ByteBuffer bb, final IOCallback callback) throws Exception
-	{
-		lock.acquire();
-		
-		int size = bb.capacity();
-		
-		try
-		{                 
-			checkFile(size);
-			currentFile.getFile().write(bb, callback);       
-			currentFile.extendOffset(size);
-			return currentFile;
-		}
-		finally
-		{
-			lock.release();
-		}
-	}
-	
 	private void repairFrom(final int pos, final JournalFile file) throws Exception
 	{
 		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
@@ -1705,15 +1632,23 @@
 	
    private TransactionCallback getTransactionCallback(final long transactionId)
    {
-      TransactionCallback callback = this.transactionCallbacks.get(transactionId);
-      
-      if (callback == null)
+      if (fileFactory.isSupportsCallbacks() && syncTransactional)
       {
-         callback = new TransactionCallback();
-         transactionCallbacks.put(transactionId, callback);
+         TransactionCallback callback = this.transactionCallbacks.get(transactionId);
+         
+         if (callback == null)
+         {
+            callback = new TransactionCallback();
+            transactionCallbacks.put(transactionId, callback);
+         }
+
+         callback.countUp();
+         return callback;
       }
-      
-      return callback;
+      else
+      {
+         return null;
+      }
    }
    
 	// Inner classes ---------------------------------------------------------------------------




More information about the jboss-cvs-commits mailing list