[jboss-cvs] JBoss Messaging SVN: r4844 - in branches/Branch_JBMESSAGING-1314: native/src and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 19 21:19:55 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-19 21:19:54 -0400 (Tue, 19 Aug 2008)
New Revision: 4844

Added:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java
Modified:
   branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO32.so
   branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
   branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp
   branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp
   branches/Branch_JBMESSAGING-1314/native/src/Version.h
   branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Experimental buffer reuse

Modified: branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO32.so
===================================================================
(Binary files differ)

Modified: branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)

Modified: branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp	2008-08-20 01:19:54 UTC (rev 4844)
@@ -36,7 +36,7 @@
 
 void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
 {
-	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback); 
+	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, bufferReference); 
 	return;
 }
 

Modified: branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp	2008-08-20 01:19:54 UTC (rev 4844)
@@ -50,7 +50,7 @@
 		std::string fileName = convertJavaString(env, jstrFileName);
 		
 		AIOController * controller = new AIOController(fileName, (int) maxIO);
-		controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;)V");
+		controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
 		if (!controller->done) return 0;
 		
 		controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
@@ -105,6 +105,23 @@
 	}
 }
 
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+  (JNIEnv *env, jclass, jobject jbuffer, jint size)
+{
+	void * buffer = env->GetDirectBufferAddress(jbuffer);
+	
+	if (buffer == 0)
+	{
+		throwException(env, "java/lang/IllegalStateException", "Invalid Direct Buffer used");
+		return;
+	}
+	
+	memset(buffer, 0, (size_t)size);
+	
+}
+
+
+
 JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
   (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
 {
@@ -182,8 +199,7 @@
 		AIOController * controller = (AIOController *) controllerAddress;
 		
 		controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
-		
-		//controller->fileOutput.preAllocate(env, blocks, size);
+
 	}
 	catch (AIOException& e)
 	{

Modified: branches/Branch_JBMESSAGING-1314/native/src/Version.h
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/Version.h	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/Version.h	2008-08-20 01:19:54 UTC (rev 4844)
@@ -1,5 +1,5 @@
 
 #ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 10
+#define _VERSION_NATIVE_AIO 12
 #endif
 

Modified: branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-08-20 01:19:54 UTC (rev 4844)
@@ -13,6 +13,14 @@
 /* Inaccessible static: EXPECTED_NATIVE_VERSION */
 /*
  * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    resetBuffer
+ * Signature: (Ljava/nio/ByteBuffer;I)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+  (JNIEnv *, jclass, jobject, jint);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
  * Method:    init
  * Signature: (Ljava/lang/String;ILorg/jboss/messaging/core/logging/Logger;)J
  */

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -54,6 +54,8 @@
 	
 	ByteBuffer newBuffer(int size);
 	
+	void setBufferCallback(BufferCallback callback);
+	
 	int getBlockSize();
 	
 	String getFileName();

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.asyncio;
+
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface BufferCallback
+{
+   void bufferDone(ByteBuffer buffer);
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -32,6 +32,7 @@
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.asyncio.AsynchronousFile;
+import org.jboss.messaging.core.asyncio.BufferCallback;
 import org.jboss.messaging.core.logging.Logger;
 
 
@@ -54,7 +55,7 @@
    
    private static boolean loaded = false;
    
-   private static int EXPECTED_NATIVE_VERSION = 10;
+   private static int EXPECTED_NATIVE_VERSION = 12;
       
    static void addMax(int io)
    {
@@ -129,6 +130,7 @@
 	private ReadWriteLock lock = new ReentrantReadWriteLock();
 	private Lock writeLock = lock.writeLock();
    private Semaphore writeSemaphore;   
+   private BufferCallback bufferCallback;
 	
 	/**
 	 *  Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -256,16 +258,25 @@
       return ByteBuffer.allocateDirect((int)size);
    }
    
+   public void setBufferCallback(BufferCallback callback)
+   {
+      this.bufferCallback = callback;
+   }
+
       
 	// Private
 	// ---------------------------------------------------------------------------------
 	
-	/** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+   /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
-	private void callbackDone(final AIOCallback callback)
+	private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
 	{
       writeSemaphore.release();
 		callback.done();
+		if (this.bufferCallback != null)
+		{
+		   this.bufferCallback.bufferDone(buffer);
+		}
 	}
 	
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
@@ -311,6 +322,8 @@
 	// Native
 	// ------------------------------------------------------------------------------------------
 	
+	public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+	
 	private static native long init(String fileName, int maxIO, Logger logger);
 	
 	private native long size0(long handle);

Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface BufferCallback extends org.jboss.messaging.core.asyncio.BufferCallback
+{
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -46,6 +46,8 @@
     */
    void open(int maxIO) throws Exception;
    
+   void setBufferCallback(BufferCallback callback);   
+   
    int getAlignment() throws Exception;
    
    int calculateBlockStart(int position) throws Exception;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -48,6 +48,8 @@
    
    int getAlignment();
    
-   int calculateBlockSize(int bytes) throws Exception;
+   int calculateBlockSize(int bytes);
    
+   void cleanBuffer(ByteBuffer buffer);
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.asyncio.AsynchronousFile;
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
@@ -181,6 +182,11 @@
       
    }
    
+   public void setBufferCallback(BufferCallback callback)
+   {
+      aioFile.setBufferCallback(callback);
+   }
+   
    public void position(final int pos) throws Exception
    {
       position.set(pos);		

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -65,6 +65,12 @@
       return ByteBuffer.allocateDirect(size);
    }
    
+   public void cleanBuffer(ByteBuffer directByteBuffer)
+   {
+      AsynchronousFileImpl.resetBuffer(directByteBuffer, directByteBuffer.capacity());
+   }
+   
+   
    public int getAlignment()
    {
       return 512;
@@ -78,7 +84,7 @@
       return newbuffer;
    }
 
-   public int calculateBlockSize(int position) throws Exception
+   public int calculateBlockSize(int position)
    {
       int alignment = getAlignment();
       

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -49,6 +49,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.LoadManager;
@@ -75,6 +76,8 @@
 public class JournalImpl implements TestableJournal
 {
    
+   private static final int REUSED_BUFFER_SIZE = 5 * 1024;
+   
    // Constants -----------------------------------------------------
    private static final int STATE_STOPPED = 0;
    
@@ -176,6 +179,10 @@
    
    private ExecutorService filesExecutor = null;
    
+   private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+   
+   private final BufferCallback bufferCallback = new LocalBufferCallback();
+   
    /*
     * We use a semaphore rather than synchronized since it performs better when
     * contended
@@ -271,7 +278,7 @@
       
       int size = SIZE_ADD_RECORD + recordLength;
       
-      ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
       
       bb.putByte(ADD_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -305,7 +312,7 @@
             
       int size = SIZE_ADD_RECORD + record.length;
       
-      ByteBuffer bb = fileFactory.newBuffer(size);
+      ByteBuffer bb = newBuffer(size);
       
       bb.put(ADD_RECORD);
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -347,7 +354,7 @@
       
       int size = SIZE_UPDATE_RECORD + record.length;
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(UPDATE_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -387,7 +394,7 @@
       
       int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
       
-      ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
       
       bb.putByte(UPDATE_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -428,7 +435,7 @@
       
       int size = SIZE_DELETE_RECORD;
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(DELETE_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -467,7 +474,7 @@
       
       int size = SIZE_ADD_RECORD_TX + recordLength;
       
-      ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size)); 
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size)); 
       
       bb.putByte(ADD_RECORD_TX);
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -504,7 +511,7 @@
       
       int size = SIZE_ADD_RECORD_TX + record.length;
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(ADD_RECORD_TX);
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -541,7 +548,7 @@
       
       int size = SIZE_UPDATE_RECORD_TX + record.length; 
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(UPDATE_RECORD_TX);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -569,6 +576,7 @@
       }
    }
    
+   
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
@@ -578,7 +586,7 @@
       
       int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize(); 
       
-      ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size)); 
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size)); 
             
       bb.putByte(UPDATE_RECORD_TX);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -605,7 +613,7 @@
          lock.release();
       }
    }
-   
+
    public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
    {
       if (state != STATE_LOADED)
@@ -615,7 +623,7 @@
       
       int size = SIZE_DELETE_RECORD_TX;
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(DELETE_RECORD_TX);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -718,7 +726,7 @@
       
       int size = SIZE_ROLLBACK_RECORD;
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(ROLLBACK_RECORD);      
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -1191,6 +1199,8 @@
       {     
          currentFile.getFile().open();
          
+         currentFile.getFile().setBufferCallback(bufferCallback);
+         
          currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
          
          currentFile.setOffset(lastDataPos);
@@ -1582,7 +1592,7 @@
    {
       int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2;
       
-      ByteBuffer bb = fileFactory.newBuffer(size); 
+      ByteBuffer bb = newBuffer(size); 
       
       bb.put(recordType);    
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -1709,7 +1719,7 @@
     * */
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
    {      
-      int size = bb.capacity();
+      int size = bb.limit();
       checkFile(size);
       bb.position(SIZE_BYTE);
       if (currentFile == null)
@@ -1734,6 +1744,7 @@
       return currentFile;
    }
    
+   
    private JournalFile createFile(final boolean keepOpened) throws Exception
    {
       int orderingID = generateOrderingID();
@@ -1772,6 +1783,8 @@
    {
       file.getFile().open();
       file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
+      file.getFile().setBufferCallback(bufferCallback);
+
    }
    
    private int generateOrderingID()
@@ -1943,7 +1956,61 @@
          return null;
       }
    }
+   // -- Area reserved for the reuse buffer logic -----------------------------------------
    
+   volatile long lastTime = System.currentTimeMillis();
+   private ByteBuffer newBuffer(int size)
+   {
+      if (System.currentTimeMillis() - lastTime > 10000)
+      {
+         System.out.println("Clear!!!" + reuseBuffers.size());
+         reuseBuffers.clear();
+      }
+      
+      if (size > REUSED_BUFFER_SIZE)
+      {
+         return fileFactory.newBuffer(size);
+      }
+      else
+      {
+      
+         ByteBuffer buffer = this.reuseBuffers.poll();
+         if (buffer == null)
+         {
+            buffer = fileFactory.newBuffer(REUSED_BUFFER_SIZE);
+         }
+         else
+         {
+            // Reset the buffer before reusing it
+            // Maybe we could make use of native methods (memset) to clean the buffer
+            buffer.limit(buffer.capacity());
+            fileFactory.cleanBuffer(buffer);
+         }
+         
+         buffer.limit(fileFactory.calculateBlockSize(size));
+         buffer.rewind();
+
+         return buffer;         
+      }
+   }
+   
+   private class LocalBufferCallback implements BufferCallback
+   {
+
+      public void bufferDone(ByteBuffer buffer)
+      {
+         lastTime = System.currentTimeMillis();
+         if (buffer.capacity() == REUSED_BUFFER_SIZE)
+         {
+            reuseBuffers.offer(buffer);
+         }
+      }
+      
+   }
+   
+   // ------------------------------------------------------------------------------------
+   
+   
    // Inner classes ---------------------------------------------------------------------------
    
    private static class TransactionCallback implements IOCallback

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -27,6 +27,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
+import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
@@ -53,6 +54,8 @@
    
    private RandomAccessFile rfile;
    
+   BufferCallback bufferCallback;
+   
    public NIOSequentialFile(final String journalDir, final String fileName)
    {
       this.journalDir = journalDir;
@@ -89,6 +92,13 @@
       open();
    }
    
+   
+   
+   public void setBufferCallback(BufferCallback callback)
+   {
+      this.bufferCallback = callback;
+   }
+
    public void fill(final int position, final int size, final byte fillCharacter) throws Exception
    {
       ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -167,6 +177,11 @@
          sync();
       }
       
+      if (bufferCallback != null)
+      {
+         bufferCallback.bufferDone(bytes);
+      }
+      
       return bytesRead;
    }
    
@@ -180,7 +195,13 @@
          {
             callback.done();
          }
+
+         if (bufferCallback != null)
+         {
+            bufferCallback.bufferDone(bytes);
+         }
          
+         
          return bytesRead;
       }
       catch (Exception e)
@@ -189,7 +210,7 @@
          throw e;
       }
    }
-   
+      
    public void sync() throws Exception
    {
       channel.force(false);

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -58,6 +58,22 @@
       return ByteBuffer.allocate(size);
    }
    
+   public void cleanBuffer(final ByteBuffer buffer)
+   {
+      final int limit = buffer.limit();
+      final int capacity = buffer.capacity();
+      buffer.limit(capacity);
+      buffer.rewind();
+      
+      for (int i = 0; i < capacity; i++)
+      {
+         buffer.put((byte)0);
+      }
+      
+      buffer.limit(limit);
+      buffer.rewind();
+   }
+   
    public ByteBuffer wrapBuffer(final byte[] bytes)
    {
       return ByteBuffer.wrap(bytes);
@@ -68,7 +84,7 @@
       return 1;
    }
 
-   public int calculateBlockSize(int bytes) throws Exception
+   public int calculateBlockSize(int bytes)
    {
       return bytes;
    }

Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class CleanBufferTest extends UnitTestCase
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   
+   public void testCleanOnNIO()
+   {
+      SequentialFileFactory factory = new NIOSequentialFileFactory("Whatever");
+
+      testBuffer(factory);
+   }
+
+   public void testCleanOnAIO()
+   {
+      if (AsynchronousFileImpl.isLoaded())
+      {
+         SequentialFileFactory factory = new AIOSequentialFileFactory("Whatever");
+   
+         testBuffer(factory);
+      }
+   }
+
+   public void testCleanOnFake()
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+      testBuffer(factory);
+   }
+
+   private void testBuffer(SequentialFileFactory factory)
+   {
+      ByteBuffer buffer = factory.newBuffer(100);
+      for (byte b = 0; b < 100; b++)
+      {
+         buffer.put(b);
+      }
+      
+      buffer.rewind();
+      
+      for (byte b = 0; b < 100; b++)
+      {
+         assertEquals(b, buffer.get());
+      }
+      
+      
+
+      buffer.limit(10);
+      factory.cleanBuffer(buffer);
+      buffer.limit(100);
+      
+      buffer.rewind();
+      
+      for (byte b = 0; b < 100; b++)
+      {
+         assertEquals(0, buffer.get());
+      }
+   }
+   
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+   
+}

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -29,6 +29,7 @@
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IArgumentMatcher;
+import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
@@ -456,6 +457,24 @@
             });
       
       
+      EasyMock.expect(mockFactory.calculateBlockSize(EasyMock.anyInt()))
+      .andStubAnswer(new IAnswer<Integer>()
+      {
+         
+         public Integer answer() throws Throwable
+         {
+            return (Integer) EasyMock.getCurrentArguments()[0];
+         }
+      });
+      
+      file1.setBufferCallback(EasyMock.isA(BufferCallback.class));
+      EasyMock.expectLastCall().anyTimes();
+      
+      file2.setBufferCallback(EasyMock.isA(BufferCallback.class));
+      EasyMock.expectLastCall().anyTimes();
+      
+      
+
       EasyMock.expect(file1.getAlignment()).andStubReturn(1);
       EasyMock.expect(file2.getAlignment()).andStubReturn(1);
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -25,6 +25,7 @@
 
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
 import org.jboss.messaging.core.journal.RecordInfo;
@@ -83,6 +84,7 @@
             }
             catch (Exception e)
             {
+               e.printStackTrace();
                this.e = e;
             }
          }
@@ -90,9 +92,9 @@
       
       LocalThread t = new LocalThread();
       t.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
       
-      latch.await();
-      
       Thread.yield();
 
       assertTrue(t.isAlive());

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-08-20 01:19:54 UTC (rev 4844)
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -101,6 +102,22 @@
       return sf;
    }
 
+   public void cleanBuffer(final ByteBuffer buffer)
+   {
+      final int limit = buffer.limit();
+      final int capacity = buffer.capacity();
+      buffer.limit(capacity);
+      buffer.rewind();
+      
+      for (int i = 0; i < capacity; i++)
+      {
+         buffer.put((byte)0);
+      }
+      
+      buffer.limit(limit);
+      buffer.rewind();
+   }
+   
    public List<String> listFiles(final String extension)
    {
       List<String> files = new ArrayList<String>();
@@ -140,7 +157,7 @@
       return ByteBuffer.allocateDirect(size);
    }
    
-   public int calculateBlockSize(int position) throws Exception
+   public int calculateBlockSize(int position)
    {
       int alignment = getAlignment();
       
@@ -229,7 +246,7 @@
       final IOCallback callback;
       volatile boolean sendError;
       
-      CallbackRunnable(FakeSequentialFile file, ByteBuffer bytes, IOCallback callback)
+      CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOCallback callback)
       {
          this.file = file;
          this.bytes = bytes;
@@ -245,8 +262,21 @@
          }
          else
          {
-            file.data.put(bytes);
-            if (callback!=null) callback.done();
+            try
+            {
+               file.data.put(bytes);
+               if (callback!=null) callback.done();
+               
+               if (file.bufferCallback != null)
+               {
+                  file.bufferCallback.bufferDone(bytes);
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               callback.onError(-1, e.getMessage());
+            }
          }
       }
 
@@ -269,7 +299,9 @@
       private final String fileName;
       
       private ByteBuffer data;
-      
+
+      private BufferCallback bufferCallback;
+
       public ByteBuffer getData()
       {
          return data;
@@ -323,6 +355,11 @@
          checkAndResize(0);
       }
 
+      public void setBufferCallback(BufferCallback callback)
+      {
+         this.bufferCallback = callback;
+      }
+
       public void fill(final int pos, final int size, final byte fillCharacter) throws Exception
       {     
          if (!open)
@@ -397,7 +434,7 @@
          
          checkAlignment(bytes.limit());
          
-         checkAndResize(bytes.capacity() + position);
+         checkAndResize(bytes.limit() + position);
          
          CallbackRunnable action = new CallbackRunnable(this, bytes, callback);
          




More information about the jboss-cvs-commits mailing list