[jboss-cvs] JBoss Messaging SVN: r7300 - in trunk: src/main/org/jboss/messaging/core/journal and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 10 18:27:05 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-10 18:27:05 -0400 (Wed, 10 Jun 2009)
New Revision: 7300

Modified:
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Improvements on journal

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -31,7 +31,9 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.VariableLatch;
 
 /**
@@ -52,7 +54,7 @@
    private TimedBufferObserver bufferObserver;
    
    // Some kernels don't have good resolutions on timers.. I've set this to disabled.. we may decide later
-   private static final boolean USE_NATIVE_TIMERS = false;
+   private static final boolean USE_NATIVE_TIMERS = true;
 
    // This is used to pause and resume the timer
    // This is a reusable Latch, that uses java.util.concurrent base classes
@@ -62,15 +64,17 @@
 
    private final int bufferSize;
 
-   private final ByteBuffer currentBuffer;
+   private final MessagingBuffer buffer;
 
+   private int bufferLimit = 0;
+
    private List<AIOCallback> callbacks;
 
    private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
    // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
    private volatile boolean active = false;
-   
+
    private final long timeout;
 
    // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
@@ -81,19 +85,19 @@
    private volatile boolean started;
 
    private final boolean flushOnSync;
-   
+
    // for logging write rates
-   
+
    private final boolean logRates;
-   
+
    private volatile long bytesFlushed;
-   
+
    private Timer logRatesTimer;
-   
+
    private TimerTask logRatesTimerTask;
-   
+
    private long lastExecution;
-      
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -109,7 +113,7 @@
          this.logRatesTimer = new Timer(true);
       }
       // Setting the interval for nano-sleeps
-      
+
       // We are keeping this disabled for now until we figure out what to do.
       // I've found a few problems with nano-sleep depending on the version of the kernel:
       // http://fixunix.com/unix/552033-problem-nanosleep.html
@@ -117,9 +121,11 @@
       {
          AsynchronousFileImpl.setNanoSleepInterval((int)timeout);
       }
-      
-      currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
-      currentBuffer.limit(0);
+
+      buffer = ChannelBuffers.buffer(bufferSize);
+      buffer.clear();
+      bufferLimit = 0;
+
       callbacks = new ArrayList<AIOCallback>();
       this.flushOnSync = flushOnSync;
       latchTimer.up();
@@ -138,11 +144,11 @@
       timerThread = new Thread(timerRunnable, "jbm-aio-timer");
 
       timerThread.start();
-      
+
       if (logRates)
       {
          logRatesTimerTask = new LogRatesTimerTask();
-         
+
          logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
       }
 
@@ -155,15 +161,15 @@
       {
          return;
       }
-      
+
       this.flush();
-      
+
       this.bufferObserver = null;
-      
+
       latchTimer.down();
 
       timerRunnable.close();
-      
+
       if (logRates)
       {
          logRatesTimerTask.cancel();
@@ -216,7 +222,7 @@
                                          ") on the journal");
       }
 
-      if (currentBuffer.limit() == 0 || currentBuffer.position() + sizeChecked > currentBuffer.limit())
+      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
       {
          flush();
 
@@ -228,8 +234,8 @@
          }
          else
          {
-            currentBuffer.rewind();
-            currentBuffer.limit(Math.min(remaining, bufferSize));
+            buffer.clear();
+            bufferLimit = Math.min(remaining, bufferSize);
             return true;
          }
       }
@@ -239,15 +245,16 @@
       }
    }
 
-   public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
+   public synchronized void addBytes(final byte[] bytes, final boolean sync, final AIOCallback callback)
    {
-      if (currentBuffer.position() == 0)
+      if (buffer.writerIndex() == 0)
       {
          // Resume latch
          latchTimer.down();
       }
-      
-      currentBuffer.put(bytes);
+
+      buffer.writeBytes(bytes);
+
       callbacks.add(callback);
 
       active = true;
@@ -268,7 +275,7 @@
          }
       }
 
-      if (currentBuffer.position() == currentBuffer.capacity())
+      if (buffer.writerIndex() == bufferLimit)
       {
          flush();
       }
@@ -276,34 +283,33 @@
 
    public synchronized void flush()
    {
-      if (currentBuffer.limit() > 0)
+      if (buffer.writerIndex() > 0)
       {
          latchTimer.up();
-         
-         int pos = currentBuffer.position();
-         
+
+         int pos = buffer.writerIndex();
+
          if (logRates)
          {
-            bytesFlushed += pos;                     
+            bytesFlushed += pos;
          }
 
          ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
 
          // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
-         // Using directBuffer.put(currentBuffer) would make several append calls for each byte
-         
-         //TODO we could optimise this further by making currentBuffer a simple byte[] and using System.arrayCopy to add bytes to it
-         //then we wouldn't need the array() call
-         directBuffer.put(currentBuffer.array(), 0, pos);
+         // Using directBuffer.put(buffer) would make several append calls for each byte
 
+         directBuffer.put(buffer.array(), 0, pos);
+
          bufferObserver.flushBuffer(directBuffer, callbacks);
 
          callbacks = new ArrayList<AIOCallback>();
 
          active = false;
          pendingSync = false;
-                  
-         currentBuffer.limit(0);
+
+         buffer.clear();
+         bufferLimit = 0;
       }
    }
 
@@ -342,35 +348,34 @@
    private class LogRatesTimerTask extends TimerTask
    {
       private boolean closed;
-      
+
       @Override
       public synchronized void run()
       {
          if (!closed)
          {
             long now = System.currentTimeMillis();
-            
+
             if (lastExecution != 0)
-            {                             
-               double rate = 1000 * ((double)bytesFlushed) / ( now  - lastExecution);
-               
-               log.info("Write rate = " + rate + " bytes / sec");                                      
+            {
+               double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
+               log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
             }
 
             lastExecution = now;
-            
+
             bytesFlushed = 0;
          }
       }
-      
+
       public synchronized boolean cancel()
       {
          closed = true;
-         
+
          return super.cancel();
       }
    }
-   
+
    private class CheckTimer implements Runnable
    {
       private volatile boolean closed = false;
@@ -381,7 +386,7 @@
          {
             Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
          }
-         
+
          while (!closed)
          {
             try
@@ -391,7 +396,7 @@
             catch (InterruptedException ignored)
             {
             }
-            
+
             sleep();
 
             checkTimer();

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -24,6 +24,8 @@
 
 import java.nio.ByteBuffer;
 
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
 /**
  * 
  * A SequentialFile
@@ -60,6 +62,10 @@
 
    void delete() throws Exception;
 
+   void write(MessagingBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+
+   void write(MessagingBuffer bytes, boolean sync) throws Exception;
+
    void write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
 
    void write(ByteBuffer bytes, boolean sync) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -40,6 +40,7 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
  * 
@@ -301,11 +302,43 @@
       return bytesRead;
    }
 
+   public void write(final MessagingBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.addBytes(bytes.array(), sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
+         buffer.put(bytes.array());
+         doWrite(buffer, callback);
+      }
+   }
+
+   public void write(final MessagingBuffer bytes, final boolean sync) throws Exception
+   {
+      if (sync)
+      {
+         IOCallback completion = SimpleWaitIOCallback.getInstance();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.instance);
+      }
+   }
+   
+   
    public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
    {
       if (timedBuffer != null)
       {
-         timedBuffer.addBytes(bytes, sync, callback);
+         // sanity check.. it shouldn't happen
+         throw new IllegalStateException("Illegal buffered usage. Can't use ByteBuffer write while buffer SequentialFile");
       }
       else
       {
@@ -329,6 +362,7 @@
       }
    }
 
+
    public void sync() throws Exception
    {
       throw new IllegalArgumentException("This method is not supported on AIO");

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -204,9 +204,14 @@
 
    public void stop()
    {
-      buffersControl.clearPoll();
+      buffersControl.stop();
       timedBuffer.stop();
    }
+   
+   protected void finalize()
+   {
+      this.stop();
+   }
 
    /** Class that will control buffer-reuse */
    private class ReuseBuffersController
@@ -220,6 +225,8 @@
 
       /** During reload we may disable/enable buffer reuse */
       private boolean enabled = true;
+      
+      private boolean stopped = false;
 
       final BufferCallback callback = new LocalBufferCallback();
 
@@ -284,8 +291,14 @@
          }
       }
 
-      public void clearPoll()
+      public synchronized void stop()
       {
+         stopped = true;
+         clearPoll();
+      }
+      
+      public synchronized void clearPoll()
+      {
          ByteBuffer reusedBuffer;
 
          while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
@@ -298,19 +311,31 @@
       {
          public void bufferDone(final ByteBuffer buffer)
          {
-            if (enabled)
+            synchronized (ReuseBuffersController.this)
             {
-               bufferReuseLastTime = System.currentTimeMillis();
-
-               // If a buffer has any other than the configured bufferSize, the buffer
-               // will be just sent to GC
-               if (buffer.capacity() == bufferSize)
+               if (stopped)
                {
-                  reuseBuffersQueue.offer(buffer);
+                  System.out.println("Releasing buffer after stopped");
+                  releaseBuffer(buffer);
                }
                else
                {
-                  releaseBuffer(buffer);
+                  
+                  if (enabled)
+                  {
+                     bufferReuseLastTime = System.currentTimeMillis();
+      
+                     // If a buffer has any other than the configured bufferSize, the buffer
+                     // will be just sent to GC
+                     if (buffer.capacity() == bufferSize)
+                     {
+                        reuseBuffersQueue.offer(buffer);
+                     }
+                     else
+                     {
+                        releaseBuffer(buffer);
+                     }
+                  }
                }
             }
          }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -43,10 +43,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -191,7 +190,7 @@
 
    private ExecutorService filesExecutor = null;
 
-   private final Lock lock = new ReentrantReadWriteLock().writeLock();
+   private final Semaphore lock = new Semaphore(1);
 
    private volatile JournalFile currentFile;
 
@@ -271,7 +270,7 @@
 
       int size = SIZE_ADD_RECORD + recordLength;
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(ADD_RECORD);
       bb.writeInt(-1); // skip ID part
@@ -283,16 +282,16 @@
       
       IOCallback callback = getSyncCallback(sync);
 
-      lock.lock();
+      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
+         JournalFile usedFile = appendRecord(bb, sync, callback);
 
          posFilesMap.put(id, new PosFiles(usedFile));
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
       
       if (callback != null)
@@ -322,7 +321,7 @@
 
       int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(UPDATE_RECORD);
       bb.writeInt(-1); // skip ID part
@@ -335,16 +334,16 @@
       
       IOCallback callback = getSyncCallback(sync);
       
-      lock.lock();
+      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
+         JournalFile usedFile = appendRecord(bb, sync, callback);
 
          posFiles.addUpdateFile(usedFile);
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
       
       if (callback != null)
@@ -369,16 +368,16 @@
 
       int size = SIZE_DELETE_RECORD;
 
-      ByteBuffer bb = newBuffer(size);
+      ChannelBuffer bb = newBuffer(size);
 
-      bb.put(DELETE_RECORD);
-      bb.putInt(-1); // skip ID part
-      bb.putLong(id);
-      bb.putInt(size);
+      bb.writeByte(DELETE_RECORD);
+      bb.writeInt(-1); // skip ID part
+      bb.writeLong(id);
+      bb.writeInt(size);
       
       IOCallback callback = getSyncCallback(sync);
 
-      lock.lock();
+      lock.acquire();
       try
       {
          JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -387,7 +386,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
       
       if (callback != null)
@@ -418,7 +417,7 @@
 
       int size = SIZE_ADD_RECORD_TX + recordLength;
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(ADD_RECORD_TX);
       bb.writeInt(-1); // skip ID part
@@ -429,10 +428,10 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      lock.lock();
+      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -440,7 +439,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
    }
 
@@ -466,7 +465,7 @@
 
       int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(UPDATE_RECORD_TX);
       bb.writeInt(-1); // skip ID part
@@ -477,10 +476,10 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      lock.lock();
+      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -488,7 +487,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
    }
 
@@ -508,7 +507,7 @@
 
       int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(DELETE_RECORD_TX);
       bb.writeInt(-1); // skip ID part
@@ -521,10 +520,10 @@
       }
       bb.writeInt(size);
 
-      lock.lock();
+      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -532,7 +531,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
    }
 
@@ -546,7 +545,7 @@
 
       int size = SIZE_DELETE_RECORD_TX;
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(DELETE_RECORD_TX);
       bb.writeInt(-1); // skip ID part
@@ -555,10 +554,10 @@
       bb.writeInt(0);
       bb.writeInt(size);
 
-      lock.lock();
+      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -566,7 +565,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
    }
 
@@ -592,11 +591,11 @@
 
       JournalTransaction tx = getTransactionInfo(txID);
 
-      ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+      ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
 
       IOCallback callback = getTransactionCallback(txID, sync);
 
-      lock.lock();
+      lock.acquire();
       try
       {
          JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -605,7 +604,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
 
       // We should wait this outside of the lock, to increase throughput
@@ -646,11 +645,11 @@
          throw new IllegalStateException("Cannot find tx with id " + txID);
       }
 
-      ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
+      ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
 
       IOCallback callback = getTransactionCallback(txID, sync);
 
-      lock.lock();
+      lock.acquire();
       try
       {
          JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -661,7 +660,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -688,16 +687,16 @@
 
       int size = SIZE_ROLLBACK_RECORD;
 
-      ByteBuffer bb = newBuffer(size);
+      ChannelBuffer bb = newBuffer(size);
 
-      bb.put(ROLLBACK_RECORD);
-      bb.putInt(-1); // skip ID part
-      bb.putLong(txID);
-      bb.putInt(size);
+      bb.writeByte(ROLLBACK_RECORD);
+      bb.writeInt(-1); // skip ID part
+      bb.writeLong(txID);
+      bb.writeInt(size);
 
       IOCallback callback = getTransactionCallback(txID, sync);
 
-      lock.lock();
+      lock.acquire();
       try
       {
          JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -708,7 +707,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -1573,8 +1572,16 @@
    // In some tests we need to force the journal to move to a next file
    public void forceMoveNextFile() throws Exception
    {
-      moveNextFile();
-      debugWait();
+      lock.acquire();
+      try
+      {
+         moveNextFile();
+         debugWait();
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void perfBlast(final int pages) throws Exception
@@ -1613,7 +1620,7 @@
          throw new IllegalStateException("Journal is already stopped");
       }
 
-      lock.lock();
+      lock.acquire();
 
       try
       {
@@ -1648,7 +1655,7 @@
       }
       finally
       {
-         lock.unlock();
+         lock.release();
       }
    }
 
@@ -1798,7 +1805,7 @@
     * @return
     * @throws Exception
     */
-   private ByteBuffer writeTransaction(final byte recordType,
+   private ChannelBuffer writeTransaction(final byte recordType,
                                        final long txID,
                                        final JournalTransaction tx,
                                        final EncodingSupport transactionData) throws Exception
@@ -1808,7 +1815,7 @@
                  2 +
                  (transactionData != null ? transactionData.getEncodeSize() + SIZE_INT : 0);
 
-      ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+      ChannelBuffer bb = newBuffer(size);
 
       bb.writeByte(recordType);
       bb.writeInt(-1); // skip ID part
@@ -1834,7 +1841,7 @@
 
       bb.writeInt(size);
 
-      return bb.toByteBuffer();
+      return bb;
    }
 
    private boolean isTransaction(final byte recordType)
@@ -1953,13 +1960,10 @@
             
 
    /** 
-    * Note: This method will perform rwlock.readLock.lock(); 
-    *       The method caller should aways unlock that readLock
+    * Note: You should aways guarantee locking the semaphore lock.
     * */
-   private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
+   private JournalFile appendRecord(final MessagingBuffer bb, final boolean sync, final IOCallback callback) throws Exception
    {
-      lock.lock();
-
       try
       {
          if (state != STATE_LOADED)
@@ -1967,7 +1971,7 @@
             throw new IllegalStateException("The journal was stopped");
          }
 
-         int size = bb.limit();
+         int size = bb.capacity();
 
          // We take into account the fileID used on the Header
          if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
@@ -1997,12 +2001,10 @@
             throw new IllegalStateException("Current file = null");
          }
 
-         bb.position(SIZE_BYTE);
+         bb.writerIndex(SIZE_BYTE);
 
-         bb.putInt(currentFile.getOrderingID());
+         bb.writeInt(currentFile.getOrderingID());
 
-         bb.rewind();
-
          if (callback != null)
          {
             currentFile.getFile().write(bb, sync, callback);
@@ -2017,7 +2019,6 @@
       finally
       {
          currentFile.getFile().unlockBuffer();
-         lock.unlock();
       }
 
    }
@@ -2078,20 +2079,11 @@
    // You need to guarantee lock.acquire() before calling this method
    private void moveNextFile() throws InterruptedException
    {
-      lock.lock();
-      try
-      {
-         closeFile(currentFile);
+      closeFile(currentFile);
 
-         currentFile = enqueueOpenFile();
-         
-         fileFactory.activate(currentFile.getFile());
-        
-      }
-      finally
-      {
-         lock.unlock();
-      }
+      currentFile = enqueueOpenFile();
+      
+      fileFactory.activate(currentFile.getFile());
    }
 
    /** 
@@ -2272,9 +2264,9 @@
       }
    }
 
-   public ByteBuffer newBuffer(final int size)
+   public ChannelBuffer newBuffer(final int size)
    {
-      return ByteBuffer.wrap(new byte[size]);
+      return ChannelBuffers.buffer(size);
    }
 
    // Inner classes
@@ -2582,22 +2574,16 @@
       {             
          try
          {            
-            lock.lock();
+            lock.acquire();
          
-            byte[] bytes = new byte[128 * 1024];
+            MessagingBuffer bb = newBuffer(128 * 1024);
             
             for (int i = 0; i < pages; i++)
             {
-               ByteBuffer bb = ByteBuffer.wrap(bytes);
-               
-               bb.limit(bytes.length);
-               
-               bb.position(0);
-               
                appendRecord(bb, false, null);
             }
             
-            lock.unlock();
+            lock.release();
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
  * 
@@ -183,6 +184,17 @@
 
    }
 
+   
+   public void write(final MessagingBuffer bytes, final boolean sync) throws Exception
+   {
+      write(ByteBuffer.wrap(bytes.array()), sync);
+   }
+
+   public void write(final MessagingBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+   {
+      write(ByteBuffer.wrap(bytes.array()), sync, callback);
+   }
+
    public void write(final ByteBuffer bytes, final boolean sync) throws Exception
    {
       position.addAndGet(bytes.limit());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -99,15 +99,14 @@
       int x = 0;
       for (int i = 0 ; i < 10; i++)
       {
-         ByteBuffer record = ByteBuffer.allocate(10);
+         byte[] bytes = new byte[10];
          for (int j = 0 ; j < 10; j++)
          {
-            record.put((byte)getSamplebyte(x++));
+            bytes[j] = getSamplebyte(x++);
          }
          
          timedBuffer.checkSize(10);
-         record.rewind();
-         timedBuffer.addBytes(record, false, dummyCallback);
+         timedBuffer.addBytes(bytes, false, dummyCallback);
       }
       
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-10 22:27:05 UTC (rev 7300)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
  * 
@@ -595,6 +596,23 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.SequentialFile#write(org.jboss.messaging.core.remoting.spi.MessagingBuffer, boolean, org.jboss.messaging.core.journal.IOCallback)
+       */
+      public void write(MessagingBuffer bytes, boolean sync, IOCallback callback) throws Exception
+      {
+         write(ByteBuffer.wrap(bytes.array()), sync, callback);
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.SequentialFile#write(org.jboss.messaging.core.remoting.spi.MessagingBuffer, boolean)
+       */
+      public void write(MessagingBuffer bytes, boolean sync) throws Exception
+      {
+         write(ByteBuffer.wrap(bytes.array()), sync);
+      }
+
    }
 
    /* (non-Javadoc)




More information about the jboss-cvs-commits mailing list