[jboss-cvs] JBoss Messaging SVN: r4331 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 28 18:01:09 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-05-28 18:01:09 -0400 (Wed, 28 May 2008)
New Revision: 4331

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Refactoring on Journal

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -48,7 +48,7 @@
    
    void delete() throws Exception;
    
-   int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+   int write(ByteBuffer bytes, IOCallback callback) throws Exception;
    
    int write(ByteBuffer bytes, boolean sync) throws Exception;
    

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -43,7 +43,9 @@
 	
 	private final long timeout;
 	
-	private AsynchronousFile aioFile;
+   private final boolean sync;
+
+   private AsynchronousFile aioFile;
 	
 	private AtomicLong position = new AtomicLong(0);
 
@@ -51,12 +53,13 @@
 	// serious performance problems. Because of that we make all the writes on AIO using a single thread.
 	private ExecutorService executor;
 	
-	public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final long timeout) throws Exception
+	public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final long timeout, final boolean sync) throws Exception
 	{
 		this.journalDir = journalDir;		
 		this.fileName = fileName;
 		this.maxIO = maxIO;
 		this.timeout = timeout;
+		this.sync = sync;
 	}
 	
 	public int getAlignment() throws Exception
@@ -188,18 +191,13 @@
 		
 		int bytesRead = read (bytes, waitCompletion);
 		
-		waitCompletion.waitLatch();
+		waitCompletion.waitLatch(timeout);
 		
-		if (waitCompletion.errorMessage != null)
-		{
-			throw new MessagingException(waitCompletion.errorCode, waitCompletion.errorMessage);
-		}
-		
 		return bytesRead;
 	}
 	
 	
-	public int write(final ByteBuffer bytes, boolean sync, final IOCallback callback) throws Exception
+	public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
 	{
 		final int bytesToWrite = bytes.limit();
 		
@@ -210,34 +208,52 @@
 		return bytesToWrite;
 	}
 
+	public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+	{
+	   if (sync && this.sync)
+	   {
+	      WaitCompletion completion = new WaitCompletion();
+	      
+	      int bytesWritten = write(bytes, completion);
+	      
+	      completion.waitLatch(timeout);
+	      
+	      return bytesWritten;
+	   }
+	   else
+	   {
+	      return write (bytes, DummyCallback.instance);
+	   }
+		
+	}
+
+	
+	// Private methods
+	// -----------------------------------------------------------------------------------------------------
+	
    private void execWrite(final ByteBuffer bytes, final IOCallback callback,
-                          final int bytesToWrite, final long positionToWrite)
+         final int bytesToWrite, final long positionToWrite)
    {
       executor.execute(new Runnable()
-		{
-		   public void run()
-		   {
-		      try
-		      {
-		         aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
-		      }
-		      catch (Exception e)
-		      {
-		         log.warn (e.getMessage(), e);
-		         if (callback != null)
-		         {
-		            callback.onError(-1, e.getMessage());
-		         }
-		      }
-		   }
-		});      
+      {
+         public void run()
+         {
+            try
+            {
+               aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
+            } catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+               if (callback != null)
+               {
+                  callback.onError(-1, e.getMessage());
+               }
+            }
+         }
+      });
    }
+
 	
-	public int write(final ByteBuffer bytes, final boolean sync) throws Exception
-	{
-		return write (bytes, sync, DummyCallback.instance);
-	}
-	
 	private void checkOpened() throws Exception
 	{
 		if (aioFile == null || !opened)
@@ -281,9 +297,23 @@
 			latch.countDown();			
 		}
 		
-		public void waitLatch() throws Exception
+		public boolean waitLatch(long timeout) throws Exception
 		{
-			latch.await();
+			if (latch.await(timeout, TimeUnit.MILLISECONDS))
+			{
+   	      if (errorMessage != null)
+   	      {
+   	         throw new MessagingException(errorCode, errorMessage);
+   	      }
+   	      return true;
+			}
+			else
+			{
+			   return false;
+			}
+	      
+
+			
 		}		
 	}	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -28,7 +28,7 @@
 	
 	public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxIO, final long timeout) throws Exception
 	{
-		return new AIOSequentialFile(journalDir, fileName, maxIO, timeout);
+		return new AIOSequentialFile(journalDir, fileName, maxIO, timeout, sync);
 	}
 	
    public boolean supportsCallbacks()

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -288,7 +288,7 @@
       if (shouldUseCallback)
       {
          SimpleCallback callback = new SimpleCallback();
-         usedFile = appendRecord(bb.getBuffer(), true, callback);
+         usedFile = appendRecord(bb.getBuffer(), callback);
          callback.waitCompletion(aioTimeout);
       }
       else
@@ -323,7 +323,7 @@
       if (shouldUseCallback)
       {         
          SimpleCallback callback = new SimpleCallback();
-         usedFile = appendRecord(bb, true, callback);
+         usedFile = appendRecord(bb, callback);
          callback.waitCompletion(aioTimeout);
       }
       else
@@ -364,7 +364,7 @@
       if (shouldUseCallback)
       {
          SimpleCallback callback = new SimpleCallback();
-         usedFile = appendRecord(bb, true, callback);
+         usedFile = appendRecord(bb, callback);
          callback.waitCompletion(aioTimeout);
       }
       else
@@ -403,7 +403,7 @@
       if (shouldUseCallback)
       {
          SimpleCallback callback = new SimpleCallback();
-         appendRecord(bb, true, callback);
+         appendRecord(bb, callback);
          callback.waitCompletion(aioTimeout);
       }
       else
@@ -447,7 +447,7 @@
       {
          TransactionCallback callback = getTransactionCallback(txID);
          callback.countUp();
-         usedFile = appendRecord(bb.getBuffer(), false, callback);
+         usedFile = appendRecord(bb.getBuffer(), callback);
       }
       else
       {
@@ -483,7 +483,7 @@
 	   bb.put(DONE);     
 	   bb.rewind();
 	   
-	   JournalFile usedFile = appendRecord(bb, false, callback);
+	   JournalFile usedFile = appendRecord(bb, callback);
 	   
 	   TransactionNegPos tx = getTransactionInfo(txID);
 	   
@@ -514,7 +514,7 @@
 		bb.put(DONE);     
 		bb.rewind();
 		
-		JournalFile usedFile = appendRecord(bb, false, callback);
+		JournalFile usedFile = appendRecord(bb, callback);
 		
 		TransactionNegPos tx = getTransactionInfo(txID);
 		
@@ -541,7 +541,7 @@
 		bb.put(DONE);        
 		bb.rewind();
 		
-		JournalFile usedFile = appendRecord(bb, false, callback);      
+		JournalFile usedFile = appendRecord(bb, callback);      
 		
 		TransactionNegPos tx = getTransactionInfo(txID);
 		
@@ -574,7 +574,7 @@
 		bb.put(DONE);           
 		bb.rewind();
 		
-		JournalFile usedFile = appendRecord(bb, true, callback);    
+		JournalFile usedFile = appendRecord(bb, callback);    
 		
 		tx.prepare(usedFile);
 	}
@@ -607,7 +607,7 @@
 		{
          TransactionCallback callback = getTransactionCallback(txID);
          callback.countUp();
-   		usedFile = appendRecord(bb, true, callback); 
+   		usedFile = appendRecord(bb, callback); 
    		callback.waitCompletion(aioTimeout);
 		}
 		else
@@ -648,7 +648,7 @@
 		if (shouldUseCallback)
 		{
 	      SimpleCallback callback = new SimpleCallback();
-   		usedFile = appendRecord(bb, true, callback);       
+   		usedFile = appendRecord(bb, callback);       
    		callback.waitCompletion(aioTimeout);
 		}
 		else
@@ -1459,7 +1459,7 @@
 		}
 	}
 	
-	private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
+	private JournalFile appendRecord(final ByteBuffer bb, final IOCallback callback) throws Exception
 	{
 		lock.acquire();
 		
@@ -1468,7 +1468,7 @@
 		try
 		{                 
 			checkFile(size);
-			currentFile.getFile().write(bb, sync, callback);       
+			currentFile.getFile().write(bb, callback);       
 			currentFile.extendOffset(size);
 			return currentFile;
 		}

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -158,24 +158,34 @@
 	
 	public int write(final ByteBuffer bytes, final boolean sync) throws Exception
 	{
-		return write(bytes, sync, null);
+      int bytesRead = channel.write(bytes);
+      
+      if (sync && this.sync)
+      {
+         channel.force(false);
+      }
+      
+      return bytesRead;
 	}
 	
-	public int write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+	public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
 	{
-		int bytesRead = channel.write(bytes);
-		
-		if (sync && this.sync)
-		{
-			channel.force(false);
-		}
-		
-		if (callback != null)
-		{
-			callback.done();
-		}
-		
-		return bytesRead;
+	   try
+	   {
+         int bytesRead = channel.write(bytes);
+         
+         if (callback != null)
+         {
+            callback.done();
+         }
+         
+         return bytesRead;
+	   }
+	   catch (Exception e)
+	   {
+	      callback.onError(-1, e.getMessage());
+	      throw e;
+	   }
 	}
 	
 	public void position(final int pos) throws Exception

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -138,7 +138,7 @@
             buffer.put((byte)'b');
          }
          
-         file.write(buffer, true, callback);
+         file.write(buffer, callback);
       }
       
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -190,21 +190,15 @@
       byte[] bytes3 = s3.getBytes("UTF-8");
       ByteBuffer bb3 = factory.wrapBuffer(bytes3);
       
-      FakeCallback callback = new FakeCallback();
-      int bytesWritten = sf.write(bb1, true, callback);
-      callback.waitComplete();
+      int bytesWritten = sf.write(bb1, true);
       
       assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
       
-      callback = new FakeCallback();
-      bytesWritten = sf.write(bb2, true, callback);
-      callback.waitComplete();
+      bytesWritten = sf.write(bb2, true);
       
       assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
       
-      callback = new FakeCallback();
-      bytesWritten = sf.write(bb3, true, callback);
-      callback.waitComplete();
+      bytesWritten = sf.write(bb3, true);
       
       assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
       
@@ -214,9 +208,7 @@
       ByteBuffer rb2 = factory.newBuffer(bytes2.length);
       ByteBuffer rb3 = factory.newBuffer(bytes3.length);
 
-      callback = new FakeCallback();
-      int bytesRead = sf.read(rb1, callback);
-      callback.waitComplete();
+      int bytesRead = sf.read(rb1);
       assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead);     
 
       for (int i=0; i<bytes1.length; i++)
@@ -224,9 +216,7 @@
       	assertEquals(bytes1[i], rb1.get(i));
       }
       
-      callback = new FakeCallback();
-      bytesRead = sf.read(rb2, callback);
-      callback.waitComplete();
+      bytesRead = sf.read(rb2);
       assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead);     
       for (int i=0; i<bytes2.length; i++)
       {
@@ -234,9 +224,7 @@
       }
       
       
-      callback = new FakeCallback();
-      bytesRead = sf.read(rb3, callback);
-      callback.waitComplete();
+      bytesRead = sf.read(rb3);
       assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead);     
       for (int i=0; i<bytes3.length; i++)
       {
@@ -265,21 +253,15 @@
       byte[] bytes3 = s3.getBytes("UTF-8");
       ByteBuffer bb3 = factory.wrapBuffer(bytes3);
       
-      FakeCallback callback = new FakeCallback();
-      int bytesWritten = sf.write(bb1, true, callback);
-      callback.waitComplete();
+      int bytesWritten = sf.write(bb1, true);
       
       assertEquals(bb1.limit(), bytesWritten);
       
-      callback = new FakeCallback();
-      bytesWritten = sf.write(bb2, true, callback);
-      callback.waitComplete();
+      bytesWritten = sf.write(bb2, true);
       
       assertEquals(bb2.limit(), bytesWritten);
       
-      callback = new FakeCallback();
-      bytesWritten = sf.write(bb3, true, callback);
-      callback.waitComplete();
+      bytesWritten = sf.write(bb3, true);
       
       assertEquals(bb3.limit(), bytesWritten);
       
@@ -329,9 +311,7 @@
       byte[] bytes1 = s1.getBytes("UTF-8");
       ByteBuffer bb1 = factory.wrapBuffer(bytes1);
       
-      FakeCallback callback = new FakeCallback();
-      int bytesWritten = sf.write(bb1, true, callback);
-      callback.waitComplete();
+      int bytesWritten = sf.write(bb1, true);
       
       assertEquals(bb1.limit(), bytesWritten);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-05-28 22:01:09 UTC (rev 4331)
@@ -229,7 +229,7 @@
          data.position(pos);
       }
 
-      public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
+      public int write(ByteBuffer bytes, IOCallback callback) throws Exception
       {
          if (!open)
          {
@@ -252,7 +252,7 @@
       
       public int write(ByteBuffer bytes, boolean sync) throws Exception
       {
-         return write(bytes, sync, null);
+         return write(bytes, null);
       }
       
       private void checkAndResize(int size)




More information about the jboss-cvs-commits mailing list