[jboss-cvs] JBoss Messaging SVN: r4175 - in trunk: src/etc and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 13 05:25:51 EDT 2008


Author: timfox
Date: 2008-05-13 05:25:51 -0400 (Tue, 13 May 2008)
New Revision: 4175

Modified:
   trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
   trunk/src/etc/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
   trunk/src/main/org/jboss/messaging/util/TypedProperties.java
   trunk/src/main/org/jboss/messaging/util/VariableLatch.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Mainy small tweaks,  reformatting and cosmetic changes to new AIO stuff


Modified: trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-05-13 09:25:51 UTC (rev 4175)
@@ -8,8 +8,8 @@
 extern "C" {
 #endif
 /* Inaccessible static: log */
+/* Inaccessible static: totalMaxIO */
 /* Inaccessible static: loaded */
-/* Inaccessible static: totalMaxIO */
 /*
  * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
  * Method:    init

Modified: trunk/src/etc/jbm-configuration.xml
===================================================================
--- trunk/src/etc/jbm-configuration.xml	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/etc/jbm-configuration.xml	2008-05-13 09:25:51 UTC (rev 4175)
@@ -21,7 +21,7 @@
       
       <remoting-host>localhost</remoting-host>
 
-      <!--  timeout in seconds -->
+      <!--  timeout in milliseconds -->
       <remoting-timeout>5000</remoting-timeout>
       
       <!-- true to disable invm communication when the client and the server are in the same JVM.     -->
@@ -67,6 +67,7 @@
       
       <journal-sync>true</journal-sync>
       
+      <!-- 10 MB journal file size -->
       <journal-file-size>104857600</journal-file-size>
       
       <journal-min-files>2</journal-min-files>

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -9,15 +9,13 @@
 
 import java.nio.ByteBuffer;
 
-
 /**
  * 
  * @author clebert.suconic at jboss.com
  *
  */
 public interface AsynchronousFile
-{
-	
+{	
 	void close() throws Exception;
 	
 	/**

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -27,88 +27,94 @@
  */
 public class AsynchronousFileImpl implements AsynchronousFile
 {
-	private static Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+   // Static 
+   // -------------------------------------------------------------------------------
+   
+   private static Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+   
+   private static AtomicInteger totalMaxIO = new AtomicInteger(0);
+   
+   private static boolean loaded = false;
+      
+   static void addMax(int io)
+   {
+      totalMaxIO.addAndGet(io);
+   }
+   
+   /** For test purposes */
+   public static int getTotalMaxIO()
+   {
+      return totalMaxIO.get();
+   }
+   
+   private static boolean loadLibrary(String name) 
+   {
+      try
+      {
+         log.trace(name + " being loaded");
+         System.loadLibrary(name);
+         return isNativeLoaded();
+      }
+      catch (Throwable e)
+      {
+         log.trace(name + " -> error loading it", e);
+         return false;
+      }
+      
+   }
+   
+   static
+   {
+      String libraries[] = new String[] {"JBMLibAIO", "JBMLibAIO32", "JBMLibAIO64"};
+            
+      for (String library: libraries)
+      {
+         if (loadLibrary(library))
+         {
+            loaded = true;
+            break;
+         }
+         else
+         {
+            log.debug("Library " + library + " not found!");
+         }
+      }
+      
+      if (!loaded)
+      {
+         log.debug("Couldn't locate LibAIO Wrapper");
+      }
+   }
+   
+   public static boolean isLoaded()
+   {
+      return loaded;
+   }
+   
+   // Attributes
+   // ---------------------------------------------------------------------------------
+		
 	private boolean opened = false;
 	private String fileName;
-	private Thread poller;
-	private static boolean loaded = false;
-	private int maxIO;
+	private Thread poller;	
+	private int maxIO;	
+	private Semaphore writeSemaphore;	
+	private ReadWriteLock lock = new ReentrantReadWriteLock();
+	private Lock writeLock = lock.writeLock();	
+	private Semaphore pollerSemaphore = new Semaphore(1);
 	
-	private static AtomicInteger totalMaxIO = new AtomicInteger(0);
-	
-	static void addMax(int io)
-	{
-	   totalMaxIO.addAndGet(io);
-	}
-
-	/** For test purposes */
-	public static int getTotalMaxIO()
-	{
-	   return totalMaxIO.get();
-	}
-	
-	Semaphore writeSemaphore;
-	
-	ReadWriteLock lock = new ReentrantReadWriteLock();
-	Lock writeLock = lock.writeLock();
-	
-	Semaphore pollerSemaphore = new Semaphore(1);
-	
 	/**
 	 *  Warning: Beware of the C++ pointer! It will bite you! :-)
 	 */ 
 	private long handler;
 	
-	private static boolean loadLibrary(String name) 
-	{
-		try
-		{
-			log.trace(name + " being loaded");
-			System.loadLibrary(name);
-			return isNativeLoaded();
-		}
-		catch (Throwable e)
-		{
-			log.trace(name + " -> error loading it", e);
-			return false;
-		}
-		
-	}
 	
-	static
-	{
-		String libraries[] = new String[] {"JBMLibAIO", "JBMLibAIO32", "JBMLibAIO64"};
-		
-		
-		for (String library: libraries)
-		{
-			if (loadLibrary(library))
-			{
-				loaded = true;
-				break;
-			}
-			else
-			{
-				log.debug("Library " + library + " not found!");
-			}
-		}
-		
-		if (!loaded)
-		{
-			log.debug("Couldn't locate LibAIO Wrapper");
-		}
-	}
 	
-	public static boolean isLoaded()
+	// AsynchronousFile implementation
+	// ------------------------------------------------------------------------------------
+			
+	public void open(final String fileName, final int maxIO)
 	{
-		return loaded;
-	}
-	
-	
-	
-	
-	public void open(String fileName, int maxIO)
-	{
 		try
 		{
 			writeLock.lock();
@@ -131,27 +137,7 @@
 			writeLock.unlock();
 		}
 	}
-	
-	class PollerThread extends Thread
-	{
-		PollerThread ()
-		{
-			super("NativePoller for " + fileName);
-		}
-		public void run()
-		{
-			// informing caller that this thread already has the lock
-			try
-			{
-				pollEvents();
-			}
-			finally
-			{
-				pollerSemaphore.release();
-			}
-		}
-	}
-	
+			
 	public void close() throws Exception
 	{
 		checkOpened();
@@ -174,9 +160,8 @@
 			pollerSemaphore.release();
 		}
 	}
-	
-	
-	public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+		
+	public void write(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
 	{
 		checkOpened();
 		this.writeSemaphore.acquireUninterruptibly();
@@ -192,7 +177,7 @@
 		
 	}
 	
-	public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+	public void read(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
 	{
 		checkOpened();
 		this.writeSemaphore.acquireUninterruptibly();
@@ -204,8 +189,7 @@
 		{
 			writeSemaphore.release();
 			throw e;
-		}
-		
+		}		
 	}
 	
 	public long size()
@@ -226,6 +210,8 @@
 		return 512;
 	}
 	
+	// Private
+	// ---------------------------------------------------------------------------------
 	
 	/** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
@@ -251,7 +237,7 @@
 		internalPollEvents(handler);
 	}
 	
-	private synchronized void  startPoller()
+	private synchronized void startPoller()
 	{
 		checkOpened();
 		
@@ -267,8 +253,6 @@
 		}
 	}
 	
-	
-	
 	private void checkOpened() 
 	{
 		if (!opened)
@@ -277,6 +261,9 @@
 		}
 	}
 	
+	// Native
+	// ------------------------------------------------------------------------------------------
+	
 	/** 
 	 * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
 	 */
@@ -305,8 +292,27 @@
 	// Should we make this method static?
 	public native ByteBuffer newBuffer(long size);
 	
+		
+	// Inner classes
+	// -----------------------------------------------------------------------------------------
 	
-	
-	
-	
+	private class PollerThread extends Thread
+   {
+      PollerThread ()
+      {
+         super("NativePoller for " + fileName);
+      }
+      public void run()
+      {
+         // informing caller that this thread already has the lock
+         try
+         {
+            pollEvents();
+         }
+         finally
+         {
+            pollerSemaphore.release();
+         }
+      }
+   }	
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -114,9 +114,7 @@
       
       this.remotingConnection = remotingConnection;
       
-      //This is always true since the MinaHandler will use an executor based on session id
-      //TODO can remove this
-      this.direct = true;
+      this.direct = direct;
       
       this.tokenBatchSize = tokenBatchSize;
    }

Modified: trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -1,3 +1,24 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
 package org.jboss.messaging.core.journal;
 
 import org.jboss.messaging.util.MessagingBuffer;
@@ -2,9 +23,18 @@
 
-/** 
+/**
+ * 
  * This class provides encoding support for the Journal.
- * */
+ * 
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
 public interface EncodingSupport
 {
    int encodeSize();
+   
    void encode(MessagingBuffer buffer);
+   
+   void decode(MessagingBuffer buffer);
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -27,7 +27,7 @@
  * 
  * A SequentialFile
  * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>Journal
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
  */
 public interface SequentialFile
@@ -47,8 +47,7 @@
    
    void delete() throws Exception;
    
-   int write(ByteBuffer bytes, boolean sync, IOCallback callback)
-         throws Exception;
+   int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
    
    int write(ByteBuffer bytes, boolean sync) throws Exception;
    

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -22,26 +22,36 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
 
+/**
+ * 
+ * A AIOSequentialFile
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
 public class AIOSequentialFile implements SequentialFile
 {
    private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
 
-   String journalDir;
-	String fileName;
-	boolean opened = false;
-	int maxIO;
+   private final String journalDir;
+   
+	private final String fileName;
 	
-	AsynchronousFile aioFile;
+	private boolean opened = false;
 	
-	AtomicLong position = new AtomicLong(0);
+	private final int maxIO;
+	
+	private AsynchronousFile aioFile;
+	
+	private AtomicLong position = new AtomicLong(0);
 
-	// A context switch on AIO would make it to synchronize the disk before switching to the new thread, what would cuase
+	// A context switch on AIO would make it to synchronize the disk before switching to the new thread, what would cause
 	// serious performance problems. Because of that we make all the writes on AIO using a single thread.
-	ExecutorService executor;
+	private ExecutorService executor;
 	
-	public AIOSequentialFile(String journalDir, String fileName, int maxIO) throws Exception
+	public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
 	{
-		this.journalDir = journalDir;
+		this.journalDir = journalDir;		
 		this.fileName = fileName;
 		this.maxIO = maxIO;
 	}
@@ -49,6 +59,7 @@
 	public int getAlignment() throws Exception
 	{
 		checkOpened();
+		
 		return aioFile.getBlockSize();
 	}
 	
@@ -60,9 +71,7 @@
 		
 		return pos;
 	}
-	
-	
-	
+			
 	public synchronized void close() throws Exception
 	{
 		checkOpened();
@@ -70,8 +79,7 @@
       executor.shutdown();
       executor.awaitTermination(120, TimeUnit.SECONDS);
 		aioFile.close();
-		aioFile = null;
-		
+		aioFile = null;		
 	}
 	
 	public void delete() throws Exception
@@ -86,8 +94,7 @@
 		file.delete();
 	}
 	
-	public void fill(int position, int size, byte fillCharacter)
-	throws Exception
+	public void fill(int position, final int size, final byte fillCharacter) throws Exception
 	{
 		checkOpened();
 		
@@ -97,22 +104,21 @@
 		{
 			blockSize = 10*1024*1024;
 		}
+		else if (size % (1024*1024) == 0)
+		{
+			blockSize = 1024*1024;
+		}
+		else if (size % (10*1024) == 0)
+		{
+			blockSize = 10*1024;
+		}
 		else
-			if (size % (1024*1024) == 0)
-			{
-				blockSize = 1024*1024;
-			}
-			else
-				if (size % (10*1024) == 0)
-				{
-					blockSize = 10*1024;
-				}
-				else
-				{
-					blockSize = aioFile.getBlockSize();
-				}
+		{
+			blockSize = aioFile.getBlockSize();
+		}
 		
 		int blocks = size / blockSize;
+		
 		if (size % blockSize != 0)
 		{
 			blocks++;
@@ -122,8 +128,8 @@
 		{
 			position = ((position / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
 		}
-		aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
 		
+		aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);		
 	}
 	
 	public String getFileName()
@@ -141,26 +147,28 @@
 		
 	}
 	
-	public void position(int pos) throws Exception
+	public void position(final int pos) throws Exception
 	{
-		position.set(pos);
-		
+		position.set(pos);		
 	}
 	
-	public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+	public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
 	{
 		int bytesToRead = bytes.limit();
+		
 		long positionToRead = position.getAndAdd(bytesToRead);
 		
 		bytes.rewind();
+		
 		aioFile.read(positionToRead, bytesToRead, bytes, callback);
 		
 		return bytesToRead;
 	}
 	
-	public int read(ByteBuffer bytes) throws Exception
+	public int read(final ByteBuffer bytes) throws Exception
 	{
 		WaitCompletion waitCompletion = new WaitCompletion();
+		
 		int bytesRead = read (bytes, waitCompletion);
 		
 		waitCompletion.waitLatch();
@@ -174,10 +182,10 @@
 	}
 	
 	
-	public int write(final ByteBuffer bytes, boolean sync, final IOCallback callback)
-	throws Exception
+	public int write(final ByteBuffer bytes, boolean sync, final IOCallback callback) throws Exception
 	{
 		final int bytesToWrite = bytes.limit();
+		
 		final long positionToWrite = position.getAndAdd(bytesToWrite);
 		
 		execWrite(bytes, callback, bytesToWrite, positionToWrite);
@@ -186,7 +194,7 @@
 	}
 
    private void execWrite(final ByteBuffer bytes, final IOCallback callback,
-         final int bytesToWrite, final long positionToWrite)
+                          final int bytesToWrite, final long positionToWrite)
    {
       executor.execute(new Runnable()
 		{
@@ -205,11 +213,10 @@
 		         }
 		      }
 		   }
-		});
-      
+		});      
    }
 	
-	public int write(ByteBuffer bytes, boolean sync) throws Exception
+	public int write(final ByteBuffer bytes, final boolean sync) throws Exception
 	{
 		return write (bytes, sync, DummyCallback.instance);
 	}
@@ -222,9 +229,8 @@
 		}
 	}
 
-	static class DummyCallback implements IOCallback
-	{
-	   
+	private static class DummyCallback implements IOCallback
+	{	   
 	   static DummyCallback instance = new DummyCallback();
 
       public void done()
@@ -233,16 +239,15 @@
 
       public void onError(int errorCode, String errorMessage)
       {
-      }
-	   
+      }	   
 	}
 	
-	class WaitCompletion implements IOCallback
-	{
-		
+	private static class WaitCompletion implements IOCallback
+	{		
 		CountDownLatch latch = new CountDownLatch(1);
 		
 		String errorMessage;
+		
 		int errorCode = 0;
 		
 		public void done()
@@ -250,20 +255,18 @@
 			latch.countDown();
 		}
 		
-		public void onError(int errorCode, String errorMessage)
+		public void onError(final int errorCode, final String errorMessage)
 		{
 			this.errorCode = errorCode;
+			
 			this.errorMessage = errorMessage;
 			
-			latch.countDown();
-			
+			latch.countDown();			
 		}
 		
 		public void waitLatch() throws Exception
 		{
 			latch.await();
-		}
-		
-	}
-	
+		}		
+	}	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -12,15 +12,21 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 
+/**
+ * 
+ * A AIOSequentialFileFactory
+ * 
+ * @author clebert.suconic at jboss.com
+ *
+ */
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
-{
-	
-	public AIOSequentialFileFactory(String journalDir)
+{	
+	public AIOSequentialFileFactory(final String journalDir)
 	{
 		super(journalDir);
 	}
 	
-	public SequentialFile createSequentialFile(String fileName, boolean sync, int maxIO) throws Exception
+	public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxIO) throws Exception
 	{
 		return new AIOSequentialFile(journalDir, fileName, maxIO);
 	}
@@ -45,11 +51,10 @@
    }
    
    // For tests only
-   public ByteBuffer wrapBuffer(byte[] bytes)
+   public ByteBuffer wrapBuffer(final byte[] bytes)
    {
       ByteBuffer newbuffer = newBuffer(bytes.length);
       newbuffer.put(bytes);
       return newbuffer;
    };
-   
-}
+   }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -66,6 +66,7 @@
 public class JournalImpl implements TestableJournal
 {
 	private static final Logger log = Logger.getLogger(JournalImpl.class);
+	
 	private static final boolean trace = log.isTraceEnabled();
 	
 	private static final int STATE_STOPPED = 0;
@@ -160,10 +161,9 @@
 
 	private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
 	
-	private boolean shouldUseCallback = false;
+	private final boolean shouldUseCallback;
    
-	
-	
+		
 	/*
 	 * We use a semaphore rather than synchronized since it performs better when contended
 	 */
@@ -186,8 +186,8 @@
 	private Reclaimer reclaimer = new Reclaimer();
 	
 	public JournalImpl(final int fileSize, final int minFiles,
-			final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
-			final String filePrefix, final String fileExtension, final int maxAIO)
+			             final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
+			             final String filePrefix, final String fileExtension, final int maxAIO)
 	{
 		if (fileSize < MIN_FILE_SIZE)
 		{
@@ -239,7 +239,7 @@
 	
 	// Journal implementation ----------------------------------------------------------------
 
-	public void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception
+	public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -276,7 +276,7 @@
       posFilesMap.put(id, new PosFiles(usedFile));
    }
 	
-	public void appendAddRecord(long id, byte recordType, byte[] record) throws Exception
+	public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
 	{
 		if (state != STATE_LOADED)
 		{
@@ -296,9 +296,9 @@
 		bb.rewind();
 		
 		JournalFile usedFile;
+		
       if (shouldUseCallback)
-      {
-         
+      {         
          SimpleCallback callback = new SimpleCallback();
          usedFile = appendRecord(bb, true, callback);
          callback.waitCompletion();
@@ -348,8 +348,7 @@
       {
          usedFile = appendRecord(bb, true);
       }
-      
-		
+      		
 		posFiles.addUpdateFile(usedFile);
 	}
 	
@@ -1175,8 +1174,7 @@
 	{
 		JournalFile[] files = new JournalFile[dataFiles.size()];
 		
-		reclaimer.scan(dataFiles.toArray(files));
-		
+		reclaimer.scan(dataFiles.toArray(files));		
 	}
 	
    public String debug() throws Exception
@@ -1197,8 +1195,7 @@
       
       builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
       builder.append(((JournalFileImpl)currentFile).debug());
-      
-      
+            
       return builder.toString();
    }
 
@@ -1343,7 +1340,7 @@
 	
 	// Private -----------------------------------------------------------------------------
 	
-	private JournalFile appendRecord(ByteBuffer bb, boolean sync) throws Exception
+	private JournalFile appendRecord(final ByteBuffer bb, final boolean sync) throws Exception
 	{
 		lock.acquire();
 		
@@ -1362,7 +1359,7 @@
 		}
 	}
 	
-	private JournalFile appendRecord(ByteBuffer bb, boolean sync, IOCallback callback) throws Exception
+	private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
 	{
 		lock.acquire();
 		
@@ -1381,7 +1378,7 @@
 		}
 	}
 	
-	private void repairFrom(int pos, JournalFile file) throws Exception
+	private void repairFrom(final int pos, final JournalFile file) throws Exception
 	{
 		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
 				" in the record that starts at position " + pos + ". " + 
@@ -1444,12 +1441,12 @@
 	}
 	
 	private void checkFile(final int size) throws Exception
-	{
-		
+	{		
 		if (size % currentFile.getFile().getAlignment() != 0)
 		{
 			throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
 		}
+		
 		//We take into account the first timestamp long
 		if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
 		{
@@ -1488,7 +1485,7 @@
 		return tx;
 	}
 	
-   private TransactionCallback getTransactionCallback(long transactionId)
+   private TransactionCallback getTransactionCallback(final long transactionId)
    {
       TransactionCallback callback = this.transactionCallbacks.get(transactionId);
       
@@ -1501,18 +1498,10 @@
       return callback;
    }
    
-   private void removeTransactionCallback(long transactionId)
-   {
-      transactionCallbacks.remove(transactionId);
-   }
-   
-	
-	
 	// Inner classes ---------------------------------------------------------------------------
 
-   class SimpleCallback implements IOCallback
-   {
-      
+   private static class SimpleCallback implements IOCallback
+   {      
       String errorMessage;
       int errorCode;
       CountDownLatch latch = new CountDownLatch(1);
@@ -1522,12 +1511,11 @@
          latch.countDown();
       }
 
-      public void onError(int errorCode, String errorMessage)
+      public void onError(final int errorCode, final String errorMessage)
       {
          this.errorMessage = errorMessage;
          this.errorCode = errorCode;
-         latch.countDown();
-         
+         latch.countDown();         
       }
       
       public void waitCompletion() throws InterruptedException 
@@ -1541,13 +1529,11 @@
          {
             throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
          }
-     }
-      
+     }      
    }
    
-   class TransactionCallback implements IOCallback
-   {
-      
+   private static class TransactionCallback implements IOCallback
+   {      
       VariableLatch countLatch = new VariableLatch();
       
       String errorMessage = null;
@@ -1573,7 +1559,7 @@
          }
       }
 
-      public void onError(int errorCode, String errorMessage)
+      public void onError(final int errorCode, final String errorMessage)
       {
          this.errorMessage = errorMessage;
          this.errorCode = errorCode;

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -67,12 +67,11 @@
 		return 1;
 	}
 	
-	public int calculateBlockStart(int position) throws Exception
+	public int calculateBlockStart(final int position) throws Exception
 	{
 		return position;
-	}
+	}	
 	
-	
 	public String getFileName()
 	{
 		return fileName;
@@ -127,12 +126,12 @@
 		close();    
 	}
 	
-	public int read(ByteBuffer bytes) throws Exception
+	public int read(final ByteBuffer bytes) throws Exception
 	{
 		return read(bytes, null);
 	}
 	
-	public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+	public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
 	{
 		try
 		{
@@ -156,12 +155,12 @@
 		
 	}
 	
-	public int write(ByteBuffer bytes, boolean sync) throws Exception
+	public int write(final ByteBuffer bytes, final boolean sync) throws Exception
 	{
 		return write(bytes, sync, null);
 	}
 	
-	public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
+	public int write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
 	{
 		int bytesRead = channel.write(bytes);
 		

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -21,26 +21,10 @@
  */
 package org.jboss.messaging.core.message.impl;
 
-import static org.jboss.messaging.util.DataConstants.BOOLEAN;
-import static org.jboss.messaging.util.DataConstants.BYTE;
-import static org.jboss.messaging.util.DataConstants.BYTES;
-import static org.jboss.messaging.util.DataConstants.CHAR;
-import static org.jboss.messaging.util.DataConstants.DOUBLE;
-import static org.jboss.messaging.util.DataConstants.FLOAT;
-import static org.jboss.messaging.util.DataConstants.INT;
-import static org.jboss.messaging.util.DataConstants.LONG;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.SHORT;
 import static org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
 import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
-import static org.jboss.messaging.util.DataConstants.SIZE_CHAR;
-import static org.jboss.messaging.util.DataConstants.SIZE_DOUBLE;
-import static org.jboss.messaging.util.DataConstants.SIZE_FLOAT;
 import static org.jboss.messaging.util.DataConstants.SIZE_INT;
 import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
-import static org.jboss.messaging.util.DataConstants.SIZE_SHORT;
-import static org.jboss.messaging.util.DataConstants.STRING;
 
 import java.util.Set;
 
@@ -128,17 +112,6 @@
 
    public void encode(MessagingBuffer buff)
    {
-//      buff.putSimpleString(destination);
-//      buff.putInt(type);
-//      buff.putBoolean(durable);
-//      buff.putLong(expiration);
-//      buff.putLong(timestamp);
-//      buff.putByte(priority);
-//      properties.encode(buff);
-//      buff.putInt(body.limit());
-//      buff.putBytes(body.array(), 0, body.limit());
-
-   
       buff.putSimpleString(destination);
       buff.putInt(type);
       buff.putBoolean(durable);
@@ -153,14 +126,6 @@
    
    public int encodeSize()
    {
-//      return /* Destination */ SimpleString.sizeofString(destination) + 
-//             /* Type */ SIZE_INT + 
-//             /* Durable */ SIZE_BOOLEAN + 
-//             /* Expiration */ SIZE_LONG + 
-//             /* Timestamp */ SIZE_LONG +
-//             /* Priority */  SIZE_BYTE + 
-//             /* PropertySize and Properties */ properties.encodeSize() + 
-//             /* BodySize and Body */ SIZE_INT + body.limit();
       return /* Destination */ SimpleString.sizeofString(destination) + 
       /* Type */ SIZE_INT + 
       /* Durable */ SIZE_BOOLEAN + 
@@ -168,8 +133,7 @@
       /* Timestamp */ SIZE_LONG + 
       /* Priority */ SIZE_BYTE + 
       /* PropertySize and Properties */ properties.encodeSize() + 
-      /* BodySize and Body */ SIZE_INT + body.limit();
-      
+      /* BodySize and Body */ SIZE_INT + body.limit();      
    }
    
    public void decode(final MessagingBuffer buffer)

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -134,14 +134,13 @@
             log.info("AIO loaded successfully");
          }
       }
-      else 
-      if (config.getJournalType() == JournalType.NIO)
+      else if (config.getJournalType() == JournalType.NIO)
       {
          journalFF = new NIOSequentialFileFactory(bindingsDir);
       }
-      else
-      if (config.getJournalType() == JournalType.JDBC)
-      { // Sanity check only... this is previously tested
+      else if (config.getJournalType() == JournalType.JDBC)
+      {
+         // Sanity check only... this is previously tested
          throw new IllegalArgumentException("JDBC Journal is not supported yet");
       }
 	      

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -7,6 +7,7 @@
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.util.MessagingBuffer;
@@ -21,6 +22,9 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(ProducerSendMessage.class);
+
+   
    // Attributes ----------------------------------------------------
 
    private ClientMessage clientMessage;

Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -61,7 +61,6 @@
 public class TypedProperties implements EncodingSupport
 {  
 	private static final Logger log = Logger.getLogger(TypedProperties.class);
-
 	
 	private Map<SimpleString, PropertyValue> properties;
 	

Modified: trunk/src/main/org/jboss/messaging/util/VariableLatch.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/VariableLatch.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/src/main/org/jboss/messaging/util/VariableLatch.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -25,7 +25,9 @@
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
 
-/** This class will use the framework provided to by AbstractQueuedSynchronizer.
+/**
+ * 
+ * This class will use the framework provided to by AbstractQueuedSynchronizer.
  * AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.
  * 
  * The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())
@@ -41,20 +43,18 @@
     * @see AbstractQueuedSynchronizer*/
    @SuppressWarnings("serial")
    private static class CountSync extends AbstractQueuedSynchronizer
-   {
-      
+   {      
       public CountSync ()
       {
          setState(0);
       }
-      
-      
+            
       public int getCount()
       {
          return getState();
       }
       
-      public int tryAcquireShared(int numberOfAqcquires)
+      public int tryAcquireShared(final int numberOfAqcquires)
       {
          return getState()==0 ? 1 : -1;
       }
@@ -71,9 +71,8 @@
             }
          }
       }
-      
-      
-      public boolean tryReleaseShared(int numberOfReleases)
+            
+      public boolean tryReleaseShared(final int numberOfReleases)
       {
          for (;;)
          {
@@ -93,9 +92,8 @@
       }
    }
    
-   CountSync control = new CountSync();
-   
-   
+   private CountSync control = new CountSync();
+      
    public int getCount()
    {
       return control.getCount();
@@ -116,7 +114,7 @@
       control.acquireSharedInterruptibly(1);
    }
    
-   public void waitCompletion(int seconds) throws InterruptedException
+   public void waitCompletion(final int seconds) throws InterruptedException
    {
       if (!control.tryAcquireSharedNanos(1, TimeUnit.SECONDS.toNanos(seconds)))
       {

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -81,8 +81,6 @@
             
             prod.send(tm);
             
-            log.info("sent message");
-            
             if (i % 10 == 0)
             {
                sess.commit();

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -258,13 +258,9 @@
          MessageConsumer cons2 = sessConsume2.createConsumer(queue1);
    
          // this should cancel message and cause delivery to other consumer
-   
-         log.info("closing session");
-         
+      
          sessConsume1.close();
          
-         log.info("closed session");
-   
          TextMessage tm3 = (TextMessage)cons2.receive(1000);
    
          assertNotNull(tm3);
@@ -814,16 +810,10 @@
 	      
 	      assertEquals("One", m.getText());
 	
-	      log.info("Closing consumer");
 	      queueConsumer.close();
-	      log.info("Closed consumer");
-	      
-	      log.info("Committing session");
+
 	      consumerSession.commit();
-	      log.info("Committed session");
-	      
-	      
-	
+
 	      // I expect that "Two" is still in the queue
 	
 	      MessageConsumer queueConsumer2 = consumerSession.createConsumer(queue1);
@@ -1463,7 +1453,6 @@
 	                     failed = true;
 	                     break;
 	                  }
-	                  log.info("received message");
 	                  if (!m.getText().equals("testing"))
 	                  {
 	                     failed = true;
@@ -3987,11 +3976,8 @@
          TextMessage tm = (TextMessage)m;
          count++;
 
-         log.info(this + " Got message:" + count);
-         
          try
-         {
-            log.info(this + " message:" + tm.getText());
+         {;
             if (count == 1)
             {
                if (!("a".equals(tm.getText())))
@@ -3999,12 +3985,10 @@
                   failed("Should be a but was " + tm.getText());
                   latch.release();
                }
-               log.info("Throwing exception");
                throw new RuntimeException("Aardvark");
             }
             else if (count == 2)
             {
-               log.info("ack mode:" + sess.getAcknowledgeMode());
                if (sess.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE || sess.getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE)
                {
                   //Message should be immediately redelivered

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-12 21:53:04 UTC (rev 4174)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-13 09:25:51 UTC (rev 4175)
@@ -85,6 +85,22 @@
       conn.close();
    }
    
+   public static void main(String[] args)
+   {
+      try
+      {
+         CoreClientTest test = new CoreClientTest();
+         
+         test.setUp();
+         test.testCoreClientPerf();
+         test.tearDown();
+      }
+      catch (Throwable t)
+      {
+         t.printStackTrace();
+      }
+   }
+   
    public void testCoreClientPerf() throws Exception
    {
       Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
@@ -94,7 +110,7 @@
       
       ClientConnection conn = cf.createConnection();
       
-      ClientSession session = conn.createClientSession(false, true, false, -1, false, false);
+      final ClientSession session = conn.createClientSession(false, true, true, 1000, false, false);
       session.createQueue(QUEUE, QUEUE, null, false, false);
       
       ClientProducer producer = session.createProducer(QUEUE);
@@ -102,6 +118,13 @@
       ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
             System.currentTimeMillis(), (byte) 1);
       
+      //byte[] bytes = new byte[1000];
+      
+      //message.getBody().putBytes(bytes);
+      
+      message.getBody().flip();
+      
+      
       ClientConsumer consumer = session.createConsumer(QUEUE, null, false, false, true);
             
       final CountDownLatch latch = new CountDownLatch(1);
@@ -115,46 +138,71 @@
          public void onMessage(ClientMessage msg)
          {
             count++;
+            
+            try
+            {
+               session.acknowledge();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
 
             if (count == numMessages)
             {
                latch.countDown();
-            }
+            }                        
          }            
       }
 
       consumer.setMessageHandler(new MyHandler());
+            
+      //System.out.println("Waiting 10 secs");
       
+     // Thread.sleep(10000);
       
+      System.out.println("Starting");
       
-      
-      
-      
+      //conn.start();
+                  
+      long start = System.currentTimeMillis();
+            
       for (int i = 0; i < numMessages; i++)
       {      
          producer.send(message);
       }
       
+      long end = System.currentTimeMillis();
+      
+      double actualRate = 1000 * (double)numMessages / ( end - start);
+      
+      System.out.println("Rate is " + actualRate);
+      
+      conn.start();
+      
+      start = System.currentTimeMillis();
+      
+      latch.await();
+      
 //      long end = System.currentTimeMillis();
 //
 //      double actualRate = 1000 * (double)numMessages / ( end - start);
 //                  
 //      System.out.println("Rate is " + actualRate);
 
-      conn.start();
+      //conn.start();
       
-      long start = System.currentTimeMillis();
-      
+
       //start = System.currentTimeMillis();
 
-      latch.await();
       
-      long end = System.currentTimeMillis();
       
-      double actualRate = 1000 * (double)numMessages / ( end - start);
+      end = System.currentTimeMillis();
       
-      System.out.println("Rate is " + actualRate);
+      actualRate = 1000 * (double)numMessages / ( end - start);
       
+      System.out.println(" consume Rate is " + actualRate);
+      
 //      
 //      message = consumer.receive(1000);
 //      




More information about the jboss-cvs-commits mailing list