[hornetq-commits] JBoss hornetq SVN: r8363 - in branches/ClebertCallback: src/main/org/hornetq/core/asyncio/impl and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Nov 21 15:37:24 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-21 15:37:23 -0500 (Sat, 21 Nov 2009)
New Revision: 8363

Modified:
   branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
sync on files

Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -41,13 +41,20 @@
     * */
    long size() throws HornetQException;
 
+   /** Some operations may need to be done only after persitency is done.
+    *  for instance, when a messaging system needs to guarantee ordering over non-persistent data, 
+    *  it needs to make sure it will only deliver the message after all the data is persisted. 
+    *  The sync won't perform any disk operation however it will wait for all the current pending operations
+    *  on this file to be finished */
+   void syncCallback(AIOCallback aioCallback);
+
    /** Any error will be reported on the callback interface */ 
    void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
 
    void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
 
    void fill(long position, int blocks, long size, byte fillChar) throws HornetQException;
-
+   
    void setBufferCallback(BufferCallback callback);
 
    int getBlockSize();

Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -295,9 +295,9 @@
             public void run()
             {
                writeSemaphore.acquireUninterruptibly();
+               
+               final long sequence = nextWritingSequence.getAndIncrement();
 
-               long sequence = nextWritingSequence.getAndIncrement();
-
                try
                {
                   write(handler, sequence, position, size, directByteBuffer, aioCallback);
@@ -321,7 +321,7 @@
       {
          writeSemaphore.acquireUninterruptibly();
 
-         long sequence = nextWritingSequence.getAndIncrement();
+         final long sequence = nextWritingSequence.getAndIncrement();
 
          try
          {
@@ -438,16 +438,42 @@
       }
    }
 
-   // Private ---------------------------------------------------------------------------
+   // Callback methods ------------------------------------------------------------------
 
+   public void syncCallback(final AIOCallback callback)
+   {
+      pendingWrites.up();
+      
+      writeExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            callbackLock.lock();
+
+            try
+            {
+               final long sequence = nextWritingSequence.getAndIncrement();
+
+               // This will execute the callback immediately if nothing is pending,
+               // or it will place it to the queue waiting for a response
+               executeCallback(callback, sequence);
+
+            }
+            finally
+            {
+               callbackLock.unlock();
+            }
+         }
+      });
+      
+   }
+   
    /** */
    @SuppressWarnings("unused")
    private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
    {
       writeSemaphore.release();
 
-      pendingWrites.down();
-
       callbackLock.lock();
 
       try
@@ -456,20 +482,11 @@
          if (sequence == -1)
          {
             callback.done();
+            pendingWrites.down();
          }
          else
          {
-            if (sequence == nextReadSequence)
-            {
-               nextReadSequence++;
-               callback.done();
-               flushCallbacks();
-            }
-            else
-            {
-               // System.out.println("Buffering callback");
-               pendingCallbacks.add(new CallbackHolder(sequence, callback));
-            }
+            executeCallback(callback, sequence);
          }
 
          // The buffer is not sent on callback for read operations
@@ -484,6 +501,26 @@
       }
    }
 
+   /**
+    * @param callback
+    * @param sequence
+    */
+   private void executeCallback(final AIOCallback callback, final long sequence)
+   {
+      if (sequence == nextReadSequence)
+      {
+         nextReadSequence++;
+         callback.done();
+         pendingWrites.down();
+         flushCallbacks();
+      }
+      else
+      {
+         // System.out.println("Buffering callback");
+         pendingCallbacks.add(new CallbackHolder(sequence, callback));
+      }
+   }
+
    private void flushCallbacks()
    {
       while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence)
@@ -498,6 +535,7 @@
          {
             holder.callback.done();
          }
+         pendingWrites.down();
          nextReadSequence++;
       }
    }
@@ -550,6 +588,8 @@
       }
    }
 
+   // Private ---------------------------------------------------------------------------
+
    private void pollEvents()
    {
       if (!opened)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -112,5 +112,7 @@
 
    void perfBlast(int pages) throws Exception;
 
+   void sync(IOCompletion callback);
 
+
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -80,6 +80,9 @@
 
    void sync() throws Exception;
 
+   /** This method will make sure the parameter callback will be invoked after all pending sync operations are done */
+   void syncCallback(IOAsyncTask callback);
+
    long size() throws Exception;
    
    void renameTo(String newFileName) throws Exception;

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -304,4 +304,14 @@
          throw new IllegalStateException("File not opened");
       }
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
+    */
+   @Override
+   protected void syncCallbackDirect(IOAsyncTask callback)
+   {
+      aioFile.syncCallback(callback);
+   }
+
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -16,7 +16,6 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.journal.IOAsyncTask;
@@ -190,11 +189,30 @@
          write(bytes, false, DummyCallback.getInstance());
       }
    }
+   
+   /**
+    * invoke the callback after all pending operations are complete.
+    */
+   public void syncCallback(IOAsyncTask callback)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.syncCallback(callback);
+      }
+      else
+      {
+         syncCallbackDirect(callback);
+      }
+   }
+   
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
+   protected abstract void syncCallbackDirect(IOAsyncTask callback);
+
    protected File getFile()
    {
       return file;
@@ -252,15 +270,22 @@
    {
       public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
       {
-         buffer.flip();
-
-         if (buffer.limit() == 0)
+         if (buffer == null)
          {
-            factory.releaseBuffer(buffer);
+            syncCallbackDirect(new DelegateCallback(callbacks));
          }
          else
          {
-            writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+            buffer.flip();
+   
+            if (buffer.limit() == 0)
+            {
+               factory.releaseBuffer(buffer);
+            }
+            else
+            {
+               writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+            }
          }
       }
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -864,7 +864,31 @@
          callback.waitCompletion();
       }
    }
+   
+   public void sync(IOCompletion callback)
+   {
+      callback.lineUp();
 
+      compactingLock.readLock().lock();
+
+      try
+      {
+         lockAppend.lock();
+         try
+         {
+            currentFile.getFile().syncCallback(callback);
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         compactingLock.readLock().unlock();
+      }
+   }
+
    public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
    {
       if (LOAD_TRACE)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -220,6 +220,18 @@
       internalWrite(bytes, sync, null);
    }
 
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
+    */
+   @Override
+   protected void syncCallbackDirect(IOAsyncTask callback)
+   {
+      // Nothing to be done on NIO.
+      // Timed buffer took care of everything
+      callback.done();
+   }
+   
    /**
     * @param bytes
     * @param sync
@@ -243,4 +255,5 @@
          callback.done();
       }
    }
+
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -225,15 +225,30 @@
          return true;
       }
    }
+   
 
-   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
+   /**
+    * This method will make sure this callback will be executed after all the pending callbacks
+    */
+   public synchronized void syncCallback(IOAsyncTask callback)
    {
-      if (buffer.writerIndex() == 0)
+      resumeTimerIfNeeded();
+      
+      callbacks.add(callback);
+      
+      pendingSync = true;
+      
+      if (flushOnSync)
       {
-         // Resume latch
-         latchTimer.down();
+         flush();
       }
+   }
 
+
+   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
+   {
+      resumeTimerIfNeeded();
+
       buffer.writeBytes(bytes);
 
       callbacks.add(callback);
@@ -259,35 +274,57 @@
       }
    }
 
+   private void resumeTimerIfNeeded()
+   {
+      if (buffer.writerIndex() == 0 && callbacks.size() == 0)
+      {
+         // Resume latch
+         latchTimer.down();
+      }
+   }
+
    public synchronized void flush()
    {
-      if (buffer.writerIndex() > 0)
+      if (buffer.writerIndex() > 0 || callbacks.size() > 0)
       {
+         // Stop latch
          latchTimer.up();
-
-         int pos = buffer.writerIndex();
-
-         if (logRates)
+         
+         if (buffer.writerIndex() == 0 && callbacks.size() > 0)
          {
-            bytesFlushed += pos;
+            // This is to perform a sync callback.
+            // When we get to here, means we have sync callbacks waiting with no buffer
+            // on this case we need to call sync on the file to make sure no other callbacks are pending
+            bufferObserver.flushBuffer(null, pendingSync, callbacks);
+   
+            callbacks = new LinkedList<IOAsyncTask>();
          }
-
-         ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
-         // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
-         // Using directBuffer.put(buffer) would make several append calls for each byte
-
-         directBuffer.put(buffer.array(), 0, pos);
-
-         bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
-
-         callbacks = new LinkedList<IOAsyncTask>();
-
-         active = false;
-         pendingSync = false;
-
-         buffer.clear();
-         bufferLimit = 0;
+         else
+         {
+            int pos = buffer.writerIndex();
+   
+            if (logRates)
+            {
+               bytesFlushed += pos;
+            }
+   
+            ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
+   
+            // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+            // Using directBuffer.put(buffer) would make several append calls for each byte
+   
+            directBuffer.put(buffer.array(), 0, pos);
+   
+            bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
+   
+            callbacks = new LinkedList<IOAsyncTask>();
+   
+            active = false;
+            pendingSync = false;
+   
+            buffer.clear();
+            bufferLimit = 0;
+         }
       }
    }
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -506,10 +506,7 @@
 
    public void sync()
    {
-      if (replicator != null)
-      {
-         replicator.sync();
-      }
+      messageJournal.sync(OperationContextImpl.getContext());
    }
 
    // Transactional operations

Modified: branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -919,10 +919,7 @@
          }
          else
          {
-            if (storageManager.isReplicated())
-            {
-               storageManager.sync();
-            }
+            storageManager.sync();
          }
 
          message.incrementRefCount(reference);

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -86,6 +86,6 @@
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
    
-   void sync();
+   void sync(OperationContext ctx);
 
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -25,6 +25,7 @@
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.replication.ReplicationManager;
 
@@ -433,6 +434,15 @@
       localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#sync()
+    */
+   public void sync(IOCompletion ctx)
+   {
+      replicationManager.sync((OperationContext)ctx);
+      localJournal.sync(ctx);
+   }
+
    /**
     * @param committedRecords
     * @param preparedTransactions

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -439,9 +439,29 @@
       replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
    }
 
-   public void sync()
+
+   public void sync(OperationContext context)
    {
-      sync(OperationContextImpl.getContext());
+      boolean executeNow = false;
+      synchronized (replicationLock)
+      {
+         context.replicationLineUp();
+         if (pendingTokens.isEmpty())
+         {
+            // this means the list is empty and we should process it now
+            executeNow = true;
+         }
+         else
+         {
+            // adding the sync to be executed in order
+            // as soon as the reponses are back from the backup
+            this.pendingTokens.add(new SyncOperation(context));
+         }
+      }
+      if (executeNow)
+      {
+         context.replicationDone();
+      }
    }
 
    // Package protected ---------------------------------------------
@@ -490,31 +510,6 @@
          ctx.replicationDone();
       }
    }
-
-   private void sync(OperationContext context)
-   {
-      boolean executeNow = false;
-      synchronized (replicationLock)
-      {
-         context.replicationLineUp();
-         if (pendingTokens.isEmpty())
-         {
-            // this means the list is empty and we should process it now
-            executeNow = true;
-         }
-         else
-         {
-            // adding the sync to be executed in order
-            // as soon as the reponses are back from the backup
-            this.pendingTokens.add(new SyncOperation(context));
-         }
-      }
-      if (executeNow)
-      {
-         context.replicationDone();
-      }
-   }
-
    
    public OperationContext getContext()
    {

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -64,63 +64,48 @@
 
    // Public --------------------------------------------------------
 
-   public void testLoop() throws Exception
-   {
-      for (int i = 0 ; i < 50; i ++)
-      {
-         testSimpleOrder();
-         tearDown();
-         setUp();
-      }
-   }
-   
    public void testSimpleOrder() throws Exception
    {
       ClientSessionFactory sf = createNettyFactory();
 
-      sf.setBlockOnNonPersistentSend(true);
-      sf.setBlockOnPersistentSend(true);
+      sf.setBlockOnNonPersistentSend(false);
+      sf.setBlockOnPersistentSend(false);
       sf.setBlockOnAcknowledge(true);
 
-      
       ClientSession session = sf.createSession(true, true, 0);
-      
+
       try
       {
          session.createQueue("queue", "queue", true);
 
          ClientProducer prod = session.createProducer("queue");
 
-
          for (int i = 0; i < 100; i++)
          {
-            ClientMessage msg = session.createClientMessage(i == 0);
+            ClientMessage msg = session.createClientMessage(i % 2 == 0);
             msg.setBody(session.createBuffer(new byte[1024]));
             msg.putIntProperty("id", i);
             prod.send(msg);
          }
 
          session.close();
-         
+
          boolean started = false;
 
          for (int start = 0; start < 3; start++)
          {
-            
-            
-            if (start == 20)
+
+            if (start == 2)
             {
                started = true;
                server.stop();
                server.start();
             }
-            
+
             session = sf.createSession(true, true);
-            
+
             session.start();
-            
-//            fail(session);
-            
+
             ClientConsumer cons = session.createConsumer("queue");
 
             for (int i = 0; i < 100; i++)
@@ -144,7 +129,7 @@
                   assertEquals(i, msg.getIntProperty("id").intValue());
                }
             }
-            
+
             session.close();
          }
 
@@ -156,11 +141,10 @@
       }
 
    }
-   
-   
+
    private void fail(ClientSession session) throws InterruptedException
    {
-      
+
       final CountDownLatch latch = new CountDownLatch(1);
 
       class MyListener implements SessionFailureListener
@@ -178,8 +162,6 @@
       MyListener listener = new MyListener();
       session.addFailureListener(listener);
 
-
-      
       RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
 
       // Simulate failure on connection
@@ -190,12 +172,10 @@
       boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
 
       assertTrue(ok);
-      
+
       session.removeFailureListener(listener);
    }
 
-
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -52,12 +52,12 @@
 
    // Public --------------------------------------------------------
 
-   public void _test() throws Exception
+   public void test() throws Exception
    {
       for (int i = 0; i < 100; i++)
       {
          System.out.println("<<<<<< " + i + " >>>>>>>");
-         testTxMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
+         testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
          tearDown();
          setUp();
       }
@@ -80,7 +80,7 @@
 
       ClientSessionFactory csf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
       csf.setBlockOnNonPersistentSend(false);
-      csf.setBlockOnPersistentSend(true);
+      csf.setBlockOnPersistentSend(false);
       ClientSession session = null;
       if (transactional)
       {

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -531,7 +531,7 @@
             }
             else
             {
-               manager.sync();
+               manager.sync(OperationContextImpl.getContext());
             }
 
             OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
@@ -961,5 +961,12 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#sync(org.hornetq.core.journal.IOCompletion)
+       */
+      public void sync(IOCompletion callback)
+      {
+      }
+
    }
 }

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -85,25 +85,34 @@
 
    protected static class CountDownCallback implements AIOCallback
    {
-      private final CountDownLatch latch;
+      private final CountDownLatch latchDone;
       
+      private final CountDownLatch waitCallback;
+      
       private final List<Integer> outputList;
       
       private final int order;
       
       private final AtomicInteger errors;
 
-      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
+      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order, final CountDownLatch waitCallback)
       {
-         this.latch = latch;
+         this.latchDone = latch;
          
          this.outputList = outputList;
          
          this.order = order;
          
          this.errors = errors;
+         
+         this.waitCallback = waitCallback;
       }
 
+      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
+      {
+         this(latch, errors, outputList, order, null);
+      }
+
       volatile boolean doneCalled = false;
 
       volatile int errorCalled = 0;
@@ -112,15 +121,26 @@
 
       public void done()
       {
+         if (waitCallback != null)
+         {
+            try
+            {
+               waitCallback.await();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace(); // -> junit report
+            }
+         }
          if (outputList != null)
          {
             outputList.add(order);
          }
          doneCalled = true;
          timesDoneCalled.incrementAndGet();
-         if (latch != null)
+         if (latchDone != null)
          {
-            latch.countDown();
+            latchDone.countDown();
          }
       }
 
@@ -135,11 +155,11 @@
          {
             errors.incrementAndGet();
          }
-         if (latch != null)
+         if (latchDone != null)
          {
             // even thought an error happened, we need to inform the latch,
                // or the test won't finish
-            latch.countDown();
+            latchDone.countDown();
          }
       }
       

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -22,6 +22,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.TestSuite;
@@ -420,6 +421,60 @@
       }
    }
 
+   public void testOrderOnSynCallback() throws Exception
+   {
+      boolean closed = false;
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+      ByteBuffer buffer = null;
+      try
+      {
+         final int NUMBER_LINES = 100;
+         final int SIZE = 512;
+
+         controller.open(FILE_NAME, 100);
+
+         controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+         CountDownLatch latch = new CountDownLatch(NUMBER_LINES * 2);
+
+         buffer = AsynchronousFileImpl.newBuffer(SIZE);
+         buffer.rewind();
+         for (int j = 0; j < SIZE; j++)
+         {
+            buffer.put((byte)(j % Byte.MAX_VALUE));
+         }
+
+         ArrayList<Integer> result = new ArrayList<Integer>();
+         
+         for (int i = 0; i < NUMBER_LINES * 2; i++)
+         {
+            CountDownCallback aio = new CountDownCallback(latch, null, result, i);
+            if (i % 2 == 0)
+            {
+               controller.write(i * SIZE, SIZE, buffer, aio);
+            }
+            else
+            {
+               controller.syncCallback(aio);
+            }
+         }
+
+         controller.close();
+         closed = true;
+
+         // We are not waiting the latch, as close should already hold for any writes
+         CountDownCallback.checkResults(NUMBER_LINES * 2, result);
+      }
+      finally
+      {
+         AsynchronousFileImpl.destroyBuffer(buffer);
+         if (!closed)
+         {
+            controller.close();
+         }
+      }
+   }
+
    public void testBufferCallbackAwaysSameBuffer() throws Exception
    {
       boolean closed = false;

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -17,11 +17,16 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.tests.util.UnitTestCase;
 
 /**
@@ -39,9 +44,9 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       factory = createFactory();
-      
+
       factory.start();
    }
 
@@ -49,13 +54,13 @@
    protected void tearDown() throws Exception
    {
       assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
-      
+
       factory.stop();
-      
+
       factory = null;
-      
+
       forceGC();
-      
+
       super.tearDown();
    }
 
@@ -172,7 +177,139 @@
       sf2.close();
 
    }
-   
+
+   public void testOrder() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
+
+      sf.open();
+      
+      factory.activateBuffer(sf);
+
+      final int records = 5000;
+      
+      sf.fill(0, records * 1024, (byte)0);
+      
+
+      final ArrayList<Integer> result = new ArrayList<Integer>();
+      
+      final CountDownLatch latch = new CountDownLatch(records);
+      
+      HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
+      
+      for (int i = 0 ; i < records; i++)
+      {
+         final int toadd = i;
+         IOAsyncTask callback = new IOAsyncTask()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+               result.add(toadd);
+               
+               latch.countDown();
+            }
+            
+         };
+         
+         if (i % 2 == 0)
+         {
+            sf.disableAutoFlush();
+            sf.fits(512);
+            sf.write(buffer, false, callback);
+            sf.enableAutoFlush();
+         }
+         else
+         {
+            sf.syncCallback(callback);
+         }
+      }
+      
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      assertEquals(records, result.size());
+      
+      int i = 0;
+      
+      for (Integer r : result)
+      {
+         assertEquals(i++, r.intValue());
+      }
+
+      
+      factory.deactivateBuffer();
+
+      sf.close();
+   }
+
+   public void testOrder2() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
+
+      sf.open();
+      
+      factory.activateBuffer(sf);
+
+      final int records = 1000;
+      
+      sf.fill(0, records * 1024, (byte)0);
+      
+
+      final ArrayList<Integer> result = new ArrayList<Integer>();
+      
+      final CountDownLatch latch = new CountDownLatch(records);
+      
+      HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
+      
+      for (int i = 0 ; i < records; i++)
+      {
+         final int toadd = i;
+         IOAsyncTask callback = new IOAsyncTask()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+               result.add(toadd);
+               
+               latch.countDown();
+            }
+            
+         };
+         
+         if (i == 10)
+         {
+            sf.write(buffer, false, callback);
+         }
+         else
+         {
+            sf.syncCallback(callback);
+         }
+      }
+      
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      assertEquals(records, result.size());
+      
+      int i = 0;
+      
+      for (Integer r : result)
+      {
+         assertEquals(i++, r.intValue());
+      }
+      
+      factory.deactivateBuffer();
+
+      sf.close();
+   }
+
    public void testRename() throws Exception
    {
       SequentialFile sf = factory.createSequentialFile("test1.hq", 1);
@@ -184,7 +321,7 @@
       assertEquals(1, fileNames.size());
 
       assertTrue(fileNames.contains("test1.hq"));
-      
+
       sf.renameTo("test1.cmp");
 
       fileNames = factory.listFiles("cmp");
@@ -204,7 +341,7 @@
       assertEquals(0, fileNames.size());
 
    }
-   
+
    public void testWriteandRead() throws Exception
    {
       SequentialFile sf = factory.createSequentialFile("write.hq", 1);
@@ -222,7 +359,7 @@
       String s3 = "echidna";
       byte[] bytes3 = s3.getBytes("UTF-8");
       ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-      
+
       long initialPos = sf.position();
       sf.writeDirect(bb1, true);
       long bytesWritten = sf.position() - initialPos;
@@ -305,7 +442,6 @@
          sf.writeDirect(bb2, true);
          bytesWritten = sf.position() - initialPos;
 
-         
          assertEquals(bb2.limit(), bytesWritten);
 
          initialPos = sf.position();
@@ -382,9 +518,9 @@
 
       try
       {
-         
+
          bb1 = factory.wrapBuffer(bytes1);
-         
+
          sf.writeDirect(bb1, true);
 
          fail("Should throw exception");

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-21 17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-21 20:37:23 UTC (rev 8363)
@@ -634,10 +634,17 @@
        */
       public void setTimedBuffer(TimedBuffer buffer)
       {
-         // TODO Auto-generated method stub
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#syncCallback(org.hornetq.core.journal.IOAsyncTask)
+       */
+      public void syncCallback(IOAsyncTask callback)
+      {
+         callback.done();
+      }
+
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list