[jboss-cvs] JBoss Messaging SVN: r7165 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 2 14:33:56 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-02 14:33:56 -0400 (Tue, 02 Jun 2009)
New Revision: 7165

Added:
   trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java
Modified:
   trunk/native/src/LibAIOController.cpp
   trunk/native/src/disktest.cpp
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java
   trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java
   trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
Log:
AIO performance fixes

Modified: trunk/native/src/LibAIOController.cpp
===================================================================
--- trunk/native/src/LibAIOController.cpp	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/native/src/LibAIOController.cpp	2009-06-02 18:33:56 UTC (rev 7165)
@@ -88,7 +88,7 @@
 		
 		if (buffer == 0)
 		{
-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
 			return;
 		}
 		
@@ -117,7 +117,7 @@
 	
 	if (buffer == 0)
 	{
-		throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+		throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
 		return;
 	}
 	
@@ -176,7 +176,7 @@
 
 		if (buffer == 0)
 		{
-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
 			return;
 		}
 		

Modified: trunk/native/src/disktest.cpp
===================================================================
--- trunk/native/src/disktest.cpp	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/native/src/disktest.cpp	2009-06-02 18:33:56 UTC (rev 7165)
@@ -21,6 +21,10 @@
 
 }
 
+/**
+ * Authored by Clebert Suconic @ redhat . com
+ * Licensed under LGPL
+ */
 int main(int arg, char * param[])
 {
    char * directory;

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -47,7 +47,7 @@
 public class TimedBuffer
 {
    // Constants -----------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(TimedBuffer.class);
 
    // Attributes ----------------------------------------------------
@@ -64,12 +64,12 @@
 
    private final ByteBuffer currentBuffer;
 
-   private final List<AIOCallback> callbacks;
+   private volatile List<AIOCallback> callbacks;
 
    private volatile long timeLastWrite = 0;
 
    private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
-   
+
    private Lock lock = new ReentrantReadWriteLock().writeLock();
 
    // Static --------------------------------------------------------
@@ -78,14 +78,15 @@
 
    // Public --------------------------------------------------------
 
-   //private byte[] data;
-   
+   // private byte[] data;
+
    public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
    {
       bufferSize = size;
       this.bufferObserver = bufferObserver;
-      this.timeout = timeout;      
+      this.timeout = timeout;
       this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
+      this.currentBuffer.limit(0);
       this.callbacks = new ArrayList<AIOCallback>();
    }
 
@@ -116,12 +117,12 @@
          }
       }
    }
-      
+
    public void lock()
    {
       lock.lock();
    }
-   
+
    public void unlock()
    {
       lock.unlock();
@@ -136,22 +137,33 @@
    {
       if (sizeChecked > bufferSize)
       {
+         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
+                                         ") on the journal");
+      }
+
+      
+      if (currentBuffer.limit() == 0 ||  currentBuffer.position() + sizeChecked > currentBuffer.limit())
+      {
          flush();
+
+         final int remaining = bufferObserver.getRemainingBytes();
          
-         currentBuffer.rewind();
-      }
-      else
-      {
-         // We verify against the currentBuffer.capacity as the observer may return a smaller buffer
-         if (currentBuffer.position() + sizeChecked > currentBuffer.limit())
+         if (sizeChecked > remaining)
          {
-            flush();
-            
+            return false;
+         }
+         else
+         {
             currentBuffer.rewind();
+            currentBuffer.limit(Math.min(remaining, bufferSize));
+            return true;
          }
       }
+      else
+      {
+         return true;
+      }
 
-      return true;
    }
 
    public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
@@ -174,17 +186,17 @@
 
    public synchronized void flush()
    {
-      if (currentBuffer != null)
+      if (currentBuffer.limit() > 0)
       {
-         ByteBuffer directBuffer = bufferObserver.newBuffer(currentBuffer.capacity(), currentBuffer.capacity());
+         ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, currentBuffer.position());
+
+         currentBuffer.flip();
          
          directBuffer.put(currentBuffer);
-         
+
          bufferObserver.flushBuffer(directBuffer, callbacks);
-
-         currentBuffer.rewind();
          
-         callbacks.clear();
+         callbacks = new ArrayList<AIOCallback>();
       }
 
       if (futureTimerRunnable != null)
@@ -194,6 +206,7 @@
       }
 
       timeLastWrite = 0;
+      currentBuffer.limit(0);
    }
 
    // Package protected ---------------------------------------------
@@ -201,7 +214,7 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
-      
+
    // Inner classes -------------------------------------------------
 
    class CheckTimer implements Runnable

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -51,9 +51,12 @@
    public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
    
    
-   /** Return a buffer, with any bufferSize up to bufferSize, as long as it fits the current file */
-   public ByteBuffer newBuffer(int minSize, int maxSize);
+   /** Return the number of remaining bytes that still fit on the observer (file) */
+   public int getRemainingBytes();
    
+   
+   public ByteBuffer newBuffer(int size, int limit);
+   
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -34,5 +34,5 @@
  */
 public interface IOCallback extends AIOCallback
 {
-
+   void waitCompletion() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -295,11 +295,11 @@
 
    public int read(final ByteBuffer bytes) throws Exception
    {
-      WaitCompletion waitCompletion = new WaitCompletion();
+      IOCallback waitCompletion = SimpleWaitIOCallback.getInstance();
 
       int bytesRead = read(bytes, waitCompletion);
 
-      waitCompletion.waitLatch();
+      waitCompletion.waitCompletion();
 
       return bytesRead;
    }
@@ -320,11 +320,11 @@
    {
       if (sync)
       {
-         WaitCompletion completion = new WaitCompletion();
+         IOCallback completion = SimpleWaitIOCallback.getInstance();
 
          write(bytes, completion);
          
-         completion.waitLatch();
+         completion.waitCompletion();
       }
       else
       {
@@ -391,56 +391,8 @@
          throw new IllegalStateException("File not opened");
       }
    }
-
-   private static class DummyCallback implements IOCallback
-   {
-      static DummyCallback instance = new DummyCallback();
-
-      public void done()
-      {
-      }
-
-      public void onError(final int errorCode, final String errorMessage)
-      {
-         log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception(errorMessage));
-      }
-   }
-
-   private static class WaitCompletion implements IOCallback
-   {
-      private final CountDownLatch latch = new CountDownLatch(1);
-
-      private volatile String errorMessage;
-
-      private volatile int errorCode = 0;
-
-      public void done()
-      {
-         latch.countDown();
-      }
-
-      public void onError(final int errorCode, final String errorMessage)
-      {
-         this.errorCode = errorCode;
-
-         this.errorMessage = errorMessage;
-
-         log.warn("Error Message " + errorMessage);
-
-         latch.countDown();
-      }
-
-      public void waitLatch() throws Exception
-      {
-         latch.await();
-         if (errorMessage != null)
-         {
-            throw new MessagingException(errorCode, errorMessage);
-         }
-         return;
-      }
-   }
-
+   
+   
    private static class DelegateCallback implements IOCallback
    {
       final List<AIOCallback> delegates;
@@ -479,6 +431,10 @@
             }
          }
       }
+
+      public void waitCompletion() throws Exception
+      {
+      }
    }
 
    class LocalBufferObserver implements TimedBufferObserver
@@ -498,19 +454,25 @@
          }
       }
 
-      public ByteBuffer newBuffer(int minSize, int size)
+      public ByteBuffer newBuffer(int size, int limit)
       {
          size = factory.calculateBlockSize(size);
-         
-         long availableSize = fileSize - position.get();
-         
-         if (availableSize == 0 || availableSize < minSize)
+         limit = factory.calculateBlockSize(limit);
+
+         ByteBuffer buffer = factory.newBuffer(size);
+         buffer.limit(limit);
+         return buffer;
+      }
+
+      public int getRemainingBytes()
+      {
+         if (fileSize - position.get() > Integer.MAX_VALUE)
          {
-            return null;
+            return Integer.MAX_VALUE;
          }
          else
          {
-            return factory.newBuffer((int)Math.min(size, availableSize));
+            return (int)(fileSize - position.get());
          }
       }
 

Added: trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A DummyCallback
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public  class DummyCallback implements IOCallback
+{
+   static DummyCallback instance = new DummyCallback();
+   
+   private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
+   
+   public static IOCallback getInstance()
+   {
+      return instance;
+   }
+
+   public void done()
+   {
+   }
+
+   public void onError(final int errorCode, final String errorMessage)
+   {
+      log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception(errorMessage));
+   }
+
+   public void waitCompletion() throws Exception
+   {
+   }
+}
+

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -273,7 +273,7 @@
 
    public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
    {
-      appendAddRecord(id, recordType, new ByteArrayEncoding(record));
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record), syncNonTransactional);
    }
 
    public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
@@ -301,11 +301,13 @@
       bb.writeByte(recordType);
       record.encode(bb);
       bb.writeInt(size);
+      
+      IOCallback callback = getSyncCallback(sync);
 
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
 
          posFilesMap.put(id, new PosFiles(usedFile));
       }
@@ -313,6 +315,11 @@
       {
          lock.unlock();
       }
+      
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
    }
 
    public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -346,10 +353,13 @@
       record.encode(bb);
       bb.writeInt(size);
 
+      
+      IOCallback callback = getSyncCallback(syncNonTransactional);
+      
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, callback);
 
          posFiles.addUpdateFile(usedFile);
       }
@@ -357,6 +367,11 @@
       {
          lock.unlock();
       }
+      
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
    }
 
    public void appendDeleteRecord(final long id) throws Exception
@@ -381,11 +396,13 @@
       bb.putInt(-1); // skip ID part
       bb.putLong(id);
       bb.putInt(size);
+      
+      IOCallback callback = getSyncCallback(syncNonTransactional);
 
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+         JournalFile usedFile = appendRecord(bb, syncNonTransactional, callback);
 
          posFiles.addDelete(usedFile);
       }
@@ -393,6 +410,11 @@
       {
          lock.unlock();
       }
+      
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
    }
 
    public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -586,7 +608,7 @@
 
       ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
 
-      TransactionCallback callback = getTransactionCallback(txID);
+      IOCallback callback = getTransactionCallback(txID);
 
       lock.lock();
       try
@@ -640,7 +662,7 @@
 
       ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
 
-      TransactionCallback callback = getTransactionCallback(txID);
+      IOCallback callback = getTransactionCallback(txID);
 
       lock.lock();
       try
@@ -687,7 +709,7 @@
       bb.putLong(txID);
       bb.putInt(size);
 
-      TransactionCallback callback = getTransactionCallback(txID);
+      IOCallback callback = getTransactionCallback(txID);
 
       lock.lock();
       try
@@ -1952,7 +1974,7 @@
     * Note: This method will perform rwlock.readLock.lock(); 
     *       The method caller should aways unlock that readLock
     * */
-   private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
+   private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
    {
       lock.lock();
 
@@ -1979,6 +2001,13 @@
             currentFile.getFile().unlockBuffer();
             moveNextFile();
             currentFile.getFile().lockBuffer();
+            
+            // The same check needs to be done at the new file also
+            if (!currentFile.getFile().fits(size))
+            {
+               // Sanity check, this should never happen
+               throw new IllegalStateException("Invalid logic on buffer allocation");
+            }
          }
 
          if (currentFile == null)
@@ -1996,6 +2025,7 @@
          {
             currentFile.getFile().write(bb, callback);
 
+            // This is defaulted to false. The user is telling us to not wait the buffer timeout when a commit or sync is called
             if (flushOnSync && sync)
             {
                currentFile.getFile().flush();
@@ -2212,8 +2242,30 @@
 
       return tx;
    }
+   
+   
+   private IOCallback getSyncCallback(boolean sync)
+   {
+      if (fileFactory.isSupportsCallbacks())
+      {
+         if (sync)
+         {
+            return SimpleWaitIOCallback.getInstance();
+         }
+         else
+         {
+            return DummyCallback.getInstance();
+         }
+      }
+      else
+      {
+         return null;
+      }
+   }
 
-   private TransactionCallback getTransactionCallback(final long transactionId) throws MessagingException
+
+
+   private IOCallback getTransactionCallback(final long transactionId) throws MessagingException
    {
       if (fileFactory.isSupportsCallbacks() && syncTransactional)
       {

Added: trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A SimpleWaitIOCallback
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SimpleWaitIOCallback implements IOCallback
+{
+
+   private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
+
+   private final CountDownLatch latch = new CountDownLatch(1);
+
+   private volatile String errorMessage;
+
+   private volatile int errorCode = 0;
+
+   public static IOCallback getInstance()
+   {
+      return new SimpleWaitIOCallback();
+   }
+
+
+   public void done()
+   {
+      latch.countDown();
+   }
+
+   public void onError(final int errorCode, final String errorMessage)
+   {
+      this.errorCode = errorCode;
+
+      this.errorMessage = errorMessage;
+
+      log.warn("Error Message " + errorMessage);
+
+      latch.countDown();
+   }
+
+   public void waitCompletion() throws Exception
+   {
+      latch.await();
+      if (errorMessage != null)
+      {
+         throw new MessagingException(errorCode, errorMessage);
+      }
+      return;
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -119,6 +119,7 @@
       // exceptions)
       for (int i = 0; i < 100; i++)
       {
+         System.out.println("i = " + i);
          journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'));
       }
       stopJournal();

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -63,4 +63,8 @@
       latch.await();
    }
 
+   public void waitCompletion() throws Exception
+   {
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java	2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java	2009-06-02 18:33:56 UTC (rev 7165)
@@ -72,6 +72,7 @@
       final AtomicInteger flushTimes = new AtomicInteger(0);
       class TestObserver implements TimedBufferObserver
       {
+         //TODO: fix the test
          public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
          {
             buffers.add(buffer);
@@ -85,6 +86,11 @@
          {
             return ByteBuffer.allocate(maxSize);
          }
+
+         public int getRemainingBytes()
+         {
+            return 1024*1024;
+         }
       }
       
       TimedBuffer timedBuffer = new TimedBuffer(new TestObserver(), 100, 3600 * 1000); // Any big timeout




More information about the jboss-cvs-commits mailing list