[jboss-cvs] JBoss Messaging SVN: r7134 - in branches/Branch_JBM2_Perf_Clebert: src/main/org/jboss/messaging/core/journal/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 29 17:44:57 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-29 17:44:57 -0400 (Fri, 29 May 2009)
New Revision: 7134

Modified:
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:


Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -40,6 +40,8 @@
    void open() throws Exception;
    
    boolean isOpen();
+   
+   void setBuffering(boolean buffering);
 
    /**
     * For certain operations (like loading) we don't need open the file with full maxIO

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -72,6 +72,8 @@
    private final TimedBuffer timedBuffer;
 
    private BufferCallback bufferCallback;
+   
+   private boolean buffering = true;
 
    /** A context switch on AIO would make it to synchronize the disk before
        switching to the new thread, what would cause
@@ -287,7 +289,14 @@
 
    public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
    {
-      timedBuffer.addBytes(bytes, callback);
+      if (buffering)
+      {
+         timedBuffer.addBytes(bytes, callback);
+      }
+      else
+      {
+         doWrite(bytes, callback);
+      }
    }
 
    public void write(final ByteBuffer bytes, final boolean sync) throws Exception
@@ -311,6 +320,19 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+    */
+   public void setBuffering(boolean buffering)
+   {
+      this.buffering = buffering;
+      if (!buffering)
+      {
+         timedBuffer.flush();
+      }
+   };
+
+
    public void sync() throws Exception
    {
       throw new IllegalArgumentException("This method is not supported on AIO");
@@ -341,11 +363,9 @@
    // Private methods
    // -----------------------------------------------------------------------------------------------------
 
-   private void execWrite(final ByteBuffer bytes, final IOCallback callback)
+   private void doWrite(final ByteBuffer bytes, final IOCallback callback)
    {
       final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
-      
-      bytes.limit(bytesToWrite);
 
       final long positionToWrite = position.getAndAdd(bytesToWrite);
 
@@ -462,7 +482,7 @@
          }
          else
          {
-            execWrite(buffer, new DelegateCallback(callbacks));
+            doWrite(buffer, new DelegateCallback(callbacks));
          }
       }
 
@@ -482,7 +502,6 @@
          }
       }
 
-   };
+   }
 
-
 }

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -1560,6 +1560,8 @@
 
       SequentialFile sf = file.getFile();
 
+      sf.setBuffering(false);
+
       sf.open(1);
 
       ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
@@ -1568,6 +1570,8 @@
 
       sf.write(bb, true);
 
+      sf.setBuffering(true);
+
       JournalFile jf = new JournalFileImpl(sf, newOrderingID);
 
       sf.position(bb.limit());

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -244,4 +244,11 @@
       return "NIOSequentialFile " + file;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+    */
+   public void setBuffering(boolean buffering)
+   {
+   }
+
 }

Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -24,11 +24,8 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
@@ -80,104 +77,4 @@
       factory.releaseBuffer(buff);
    }
 
-   public void testBlockCallback() throws Exception
-   {
-      class BlockCallback implements IOCallback
-      {
-         AtomicInteger countDone = new AtomicInteger(0);
-
-         AtomicInteger countError = new AtomicInteger(0);
-
-         CountDownLatch blockLatch;
-
-         BlockCallback()
-         {
-            blockLatch = new CountDownLatch(1);
-         }
-
-         public void release()
-         {
-            blockLatch.countDown();
-         }
-
-         public void done()
-         {
-            try
-            {
-               blockLatch.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-
-            countDone.incrementAndGet();
-         }
-
-         public void onError(final int errorCode, final String errorMessage)
-         {
-            try
-            {
-               blockLatch.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-
-            countError.incrementAndGet();
-         }
-      }
-
-      BlockCallback callback = new BlockCallback();
-
-      final int NUMBER_OF_RECORDS = 500;
-
-      SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
-      file.open();
-      file.fill(0, 512 * NUMBER_OF_RECORDS, (byte)'a');
-
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-      {
-         ByteBuffer buffer = factory.newBuffer(512);
-
-         buffer.putInt(i + 10);
-
-         for (int j = buffer.position(); j < buffer.limit(); j++)
-         {
-            buffer.put((byte)'b');
-         }
-
-         file.write(buffer, callback);
-      }
-
-      callback.release();
-      file.close();
-      assertEquals(NUMBER_OF_RECORDS, callback.countDone.get());
-      assertEquals(0, callback.countError.get());
-
-      file.open();
-
-      ByteBuffer buffer = factory.newBuffer(512);
-
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-      {
-
-         file.read(buffer);
-         buffer.rewind();
-
-         int recordRead = buffer.getInt();
-
-         assertEquals(i + 10, recordRead);
-
-         for (int j = buffer.position(); j < buffer.limit(); j++)
-         {
-            assertEquals((byte)'b', buffer.get());
-         }
-
-      }
-
-      file.close();
-   }
-
 }

Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -120,7 +120,7 @@
          testAddDeleteJournal(new AIOSequentialFileFactory(getTestDir(), 100 * 1024, 2),
                               NUM_RECORDS,
                               SIZE_RECORD,
-                              13,
+                              20,
                               10 * 1024 * 1024);
 
       }
@@ -139,7 +139,7 @@
          testAddDeleteJournal(new NIOSequentialFileFactory(getTestDir()),
                               NUM_RECORDS,
                               SIZE_RECORD,
-                              13,
+                              20,
                               10 * 1024 * 1024);
       }
    }
@@ -250,6 +250,58 @@
       }
    }
 
+   public void testDeleteme() throws Exception
+   {
+
+      JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+                                            // size.. not an exact science here
+                                            10, // number of files pre-allocated
+                                            true, // sync on commit
+                                            false, // no sync on non transactional
+                                            new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2), // AIO or NIO
+                                            "jbm", // file name
+                                            "jbm", // extension
+                                            500); // it's like a semaphore for callback on the AIO layer
+      // this during record writes
+
+      journal.start();
+      journal.load(dummyLoader);
+
+      FakeMessage msg = new FakeMessage(1024);
+      FakeQueueEncoding update = new FakeQueueEncoding();
+
+      journal.appendAddRecord(1, (byte)1, msg);
+
+      journal.forceMoveNextFile();
+
+      journal.appendUpdateRecord(1, (byte)2, update);
+
+      journal.appendAddRecord(2, (byte)1, msg);
+
+      journal.appendDeleteRecord(1);
+
+      journal.forceMoveNextFile();
+
+      journal.appendDeleteRecord(2);
+
+      journal.stop();
+
+      journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+                                // size.. not an exact science here
+                                2, // number of files pre-allocated
+                                true, // sync on commit
+                                false, // no sync on non transactional
+                                new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2), // AIO or NIO
+                                "jbm", // file name
+                                "jbm", // extension
+                                500); // it's like a semaphore for callback on the AIO layer
+      // this during record writes
+
+      journal.start();
+      journal.load(dummyLoader);
+
+   }
+
    public void testJournal(SequentialFileFactory fileFactory, long records, int size, int numberOfFiles, int journalSize) throws Exception
    {
 
@@ -302,12 +354,16 @@
    {
       final int size;
 
-      byte buffer[];
+      byte bytes[];
 
       FakeMessage(int size)
       {
          this.size = size;
-         buffer = new byte[size];
+         bytes = new byte[size];
+         for (int i = 0; i < size; i++)
+         {
+            bytes[i] = (byte)'a';
+         }
       }
 
       public void decode(MessagingBuffer buffer)
@@ -316,7 +372,7 @@
 
       public void encode(MessagingBuffer buffer)
       {
-         buffer.writeBytes(new byte[size]);
+         buffer.writeBytes(this.bytes);
       }
 
       /* (non-Javadoc)
@@ -331,27 +387,21 @@
 
    private static class FakeQueueEncoding implements EncodingSupport
    {
-      long queueID;
 
-      public FakeQueueEncoding(final long queueID)
-      {
-         super();
-         this.queueID = queueID;
-      }
-
       public FakeQueueEncoding()
       {
-         super();
       }
 
       public void decode(final MessagingBuffer buffer)
       {
-         queueID = buffer.readLong();
       }
 
       public void encode(final MessagingBuffer buffer)
       {
-         buffer.writeLong(queueID);
+         for (int i = 0 ; i < 8; i++)
+         {
+            buffer.writeByte((byte)'q');
+         }
       }
 
       public int getEncodeSize()

Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-05-29 21:44:57 UTC (rev 7134)
@@ -570,6 +570,13 @@
          return data.position() + size <= data.limit();
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+       */
+      public void setBuffering(boolean buffering)
+      {
+      }
+
    }
 
    /* (non-Javadoc)
@@ -592,7 +599,6 @@
     */
    public BufferCallback getBufferCallback()
    {
-      // TODO Auto-generated method stub
       return null;
    }
 




More information about the jboss-cvs-commits mailing list