[jboss-cvs] JBoss Messaging SVN: r4391 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 5 08:18:05 EDT 2008


Author: timfox
Date: 2008-06-05 08:18:05 -0400 (Thu, 05 Jun 2008)
New Revision: 4391

Modified:
   trunk/docs/userguide/en/modules/introduction.xml
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java
Log:
Fixed some thread safety issues with AIO callbacks and removed some unused stuff


Modified: trunk/docs/userguide/en/modules/introduction.xml
===================================================================
--- trunk/docs/userguide/en/modules/introduction.xml	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/docs/userguide/en/modules/introduction.xml	2008-06-05 12:18:05 UTC (rev 4391)
@@ -2,7 +2,7 @@
 <chapter id="introduction">
    <title>Introduction</title>
 
-   <section id="intro.intro">
+   <section id="introduction.intro">
       <para>JBoss Messaging 2.0 alpha, is a bare-bones messaging system. This release is designed to show case the elegant architecture and high performance transport and persistence. Many other features, including state of the art clustering will be added before the final general availability (GA) release</para>
       
       <para>JBoss Messaging builds upon the solid performance of JBoss Messaging 1.4 to bring unrivalled levels of performance and scalability</para>

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -273,11 +273,11 @@
 	
 	private static class WaitCompletion implements IOCallback
 	{		
-		CountDownLatch latch = new CountDownLatch(1);
+		private final CountDownLatch latch = new CountDownLatch(1);
 		
-		String errorMessage;
+		private volatile String errorMessage;
 		
-		int errorCode = 0;
+		private volatile int errorCode = 0;
 		
 		public void done()
 		{
@@ -307,9 +307,6 @@
 			{
 			   return false;
 			}
-	      
-
-			
 		}		
 	}	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -1667,46 +1667,13 @@
    
 	// Inner classes ---------------------------------------------------------------------------
 
-   private static class SimpleCallback implements IOCallback
-   {      
-      private String errorMessage;
-      
-      private int errorCode;
-      
-      private CountDownLatch latch = new CountDownLatch(1);
-
-      public void done()
-      {
-         latch.countDown();
-      }
-
-      public void onError(final int errorCode, final String errorMessage)
-      {
-         this.errorMessage = errorMessage;
-         this.errorCode = errorCode;
-         latch.countDown();         
-      }
-      
-      public void waitCompletion(long timeout) throws InterruptedException 
-      {
-         if (!latch.await(timeout, TimeUnit.MILLISECONDS))
-         {
-            throw new IllegalStateException("Timeout!");
-         }
-         if (errorMessage != null)
-         {
-            throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
-         }
-     }      
-   }
-   
    private static class TransactionCallback implements IOCallback
    {      
       private final VariableLatch countLatch = new VariableLatch();
       
-      private String errorMessage = null;
+      private volatile String errorMessage = null;
       
-      private int errorCode = 0;
+      private volatile int errorCode = 0;
       
       public void countUp()
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -25,6 +25,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
@@ -94,29 +95,28 @@
 
    protected static class CountDownCallback implements AIOCallback
    {
-
-       CountDownLatch latch;
+       private final CountDownLatch latch;
        
-       public CountDownCallback(CountDownLatch latch)
+       public CountDownCallback(final CountDownLatch latch)
        {
            this.latch = latch;
        }
        
-       boolean doneCalled = false;
-       boolean errorCalled = false;
-       int timesDoneCalled = 0;
+       volatile boolean doneCalled = false;
+       volatile boolean errorCalled = false;
+       final AtomicInteger timesDoneCalled = new AtomicInteger(0);
 
        public void done()
        {
            doneCalled = true;
-           timesDoneCalled++;
+           timesDoneCalled.incrementAndGet();
            if (latch != null) 
            {
                latch.countDown();
            }
        }
 
-       public void onError(int errorCode, String errorMessage)
+       public void onError(final int errorCode, final String errorMessage)
        {
            errorCalled = true;
            if (latch != null)
@@ -125,9 +125,8 @@
                latch.countDown();
            }
            System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
-           
+          
        }
-
    }
 
    

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -138,14 +138,14 @@
          
          for (CountDownCallback callback : list)
          {
-            assertEquals(1, callback.timesDoneCalled);
+            assertEquals(1, callback.timesDoneCalled.get());
             assertTrue(callback.doneCalled);
             assertFalse(callback.errorCalled);
          }
          
          for (CountDownCallback callback : list2)
          {
-            assertEquals(1, callback.timesDoneCalled);
+            assertEquals(1, callback.timesDoneCalled.get());
             assertTrue(callback.doneCalled);
             assertFalse(callback.errorCalled);
          }
@@ -183,10 +183,10 @@
    public void testInvalidReads() throws Exception
    {
       class LocalCallback implements AIOCallback
-      {
+      {         
+         private CountDownLatch latch = new CountDownLatch(1);
          
-         CountDownLatch latch = new CountDownLatch(1);
-         boolean error;
+         volatile boolean error;
          
          public void done()
          {
@@ -533,7 +533,7 @@
          
          for (CountDownCallback tmp : list)
          {
-            assertEquals(1, tmp.timesDoneCalled);
+            assertEquals(1, tmp.timesDoneCalled.get());
             assertTrue(tmp.doneCalled);
             assertFalse(tmp.errorCalled);
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -217,50 +217,7 @@
       journal.start();
       
       journal.load(new ArrayList<RecordInfo>(), null);
-      
-
-      final CountDownLatch latch = new CountDownLatch((int)numMessages);
-      
-      
-      class LocalCallback implements IOCallback
-      {
-
-         int i=0;
-         String message = null;
-         boolean done = false;
-         CountDownLatch latch;
-         
-         public LocalCallback(int i, CountDownLatch latch)
-         {
-            this.i = i;
-            this.latch = latch;
-         }
-         public void done()
-         {
-            synchronized (this)
-            {
-               if (done)
-               {
-                  message = "done received in duplicate";
-               }
-               done = true;
-               this.latch.countDown();
-            }
-         }
-
-         public void onError(int errorCode, String errorMessage)
-         {
-            synchronized (this)
-            {
-               System.out.println("********************** Error = " + (i++));
-               message = errorMessage;
-               latch.countDown();
-            }
-         }
-         
-      }
-      
-      
+            
       log.info("Adding data");
       byte[] data = new byte[700];
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -25,6 +25,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -71,10 +72,9 @@
    public void testBlockCallback() throws Exception
    {
       class BlockCallback implements IOCallback
-      {
-         
-         int countDone = 0;
-         int countError = 0;
+      {         
+         AtomicInteger countDone = new AtomicInteger(0);
+         AtomicInteger countError = new AtomicInteger(0);
          CountDownLatch blockLatch;
 
          BlockCallback()
@@ -88,20 +88,17 @@
          }
          
          public void done()
-         {
-            
-           try
+         {            
+            try
             {
                blockLatch.await();
-            } catch (InterruptedException e)
+            }
+            catch (InterruptedException e)
             {
                e.printStackTrace();
             }
-            
-            countDone ++;
-            
-            
-            
+
+            countDone.incrementAndGet();
          }
 
          public void onError(int errorCode, String errorMessage)
@@ -109,12 +106,13 @@
             try
             {
                blockLatch.await();
-            } catch (InterruptedException e)
+            }
+            catch (InterruptedException e)
             {
                e.printStackTrace();
             }
             
-            countError ++;
+            countError.incrementAndGet();
          }
       }
       
@@ -144,11 +142,9 @@
       
       callback.release();
       file.close();
-      assertEquals(NUMBER_OF_RECORDS, callback.countDone);
-      assertEquals(0, callback.countError);
+      assertEquals(NUMBER_OF_RECORDS, callback.countDone.get());
+      assertEquals(0, callback.countError.get());
       
-      
-      
       file.open();
       
       ByteBuffer buffer = factory.newBuffer(512);
@@ -172,10 +168,6 @@
       
       
       file.close();
-      
-      
-      
-      
    }
    
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java	2008-06-05 09:28:53 UTC (rev 4390)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java	2008-06-05 12:18:05 UTC (rev 4391)
@@ -6,11 +6,10 @@
 
 public class FakeCallback implements IOCallback
 {
-
-   String msg;
-   CountDownLatch latch;
+   volatile String msg;
+   final CountDownLatch latch;
    
-   public FakeCallback(CountDownLatch latch)
+   public FakeCallback(final CountDownLatch latch)
    {
       this.latch = latch;
    }
@@ -25,7 +24,7 @@
       latch.countDown();
    }
 
-   public void onError(int errorCode, String errorMessage)
+   public void onError(final int errorCode, final String errorMessage)
    {
       latch.countDown();
       this.msg = errorMessage;




More information about the jboss-cvs-commits mailing list