[hornetq-commits] JBoss hornetq SVN: r8366 - in branches/ClebertCallback: src/main/org/hornetq/core/asyncio/impl and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Nov 21 18:29:51 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-21 18:29:50 -0500 (Sat, 21 Nov 2009)
New Revision: 8366

Removed:
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
   branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
Modified:
   branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Removing sync operations on Replication and journal

Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -41,20 +41,13 @@
     * */
    long size() throws HornetQException;
 
-   /** Some operations may need to be done only after persitency is done.
-    *  for instance, when a messaging system needs to guarantee ordering over non-persistent data, 
-    *  it needs to make sure it will only deliver the message after all the data is persisted. 
-    *  The sync won't perform any disk operation however it will wait for all the current pending operations
-    *  on this file to be finished */
-   void syncCallback(AIOCallback aioCallback);
-
    /** Any error will be reported on the callback interface */ 
    void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
 
    void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
 
    void fill(long position, int blocks, long size, byte fillChar) throws HornetQException;
-   
+
    void setBufferCallback(BufferCallback callback);
 
    int getBlockSize();

Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -295,9 +295,9 @@
             public void run()
             {
                writeSemaphore.acquireUninterruptibly();
-               
-               final long sequence = nextWritingSequence.getAndIncrement();
 
+               long sequence = nextWritingSequence.getAndIncrement();
+
                try
                {
                   write(handler, sequence, position, size, directByteBuffer, aioCallback);
@@ -321,7 +321,7 @@
       {
          writeSemaphore.acquireUninterruptibly();
 
-         final long sequence = nextWritingSequence.getAndIncrement();
+         long sequence = nextWritingSequence.getAndIncrement();
 
          try
          {
@@ -438,42 +438,16 @@
       }
    }
 
-   // Callback methods ------------------------------------------------------------------
+   // Private ---------------------------------------------------------------------------
 
-   public void syncCallback(final AIOCallback callback)
-   {
-      pendingWrites.up();
-      
-      writeExecutor.execute(new Runnable()
-      {
-         public void run()
-         {
-            callbackLock.lock();
-
-            try
-            {
-               final long sequence = nextWritingSequence.getAndIncrement();
-
-               // This will execute the callback immediately if nothing is pending,
-               // or it will place it to the queue waiting for a response
-               executeCallback(callback, sequence);
-
-            }
-            finally
-            {
-               callbackLock.unlock();
-            }
-         }
-      });
-      
-   }
-   
    /** */
    @SuppressWarnings("unused")
    private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
    {
       writeSemaphore.release();
 
+      pendingWrites.down();
+
       callbackLock.lock();
 
       try
@@ -482,11 +456,20 @@
          if (sequence == -1)
          {
             callback.done();
-            pendingWrites.down();
          }
          else
          {
-            executeCallback(callback, sequence);
+            if (sequence == nextReadSequence)
+            {
+               nextReadSequence++;
+               callback.done();
+               flushCallbacks();
+            }
+            else
+            {
+               // System.out.println("Buffering callback");
+               pendingCallbacks.add(new CallbackHolder(sequence, callback));
+            }
          }
 
          // The buffer is not sent on callback for read operations
@@ -501,26 +484,6 @@
       }
    }
 
-   /**
-    * @param callback
-    * @param sequence
-    */
-   private void executeCallback(final AIOCallback callback, final long sequence)
-   {
-      if (sequence == nextReadSequence)
-      {
-         nextReadSequence++;
-         callback.done();
-         pendingWrites.down();
-         flushCallbacks();
-      }
-      else
-      {
-         // System.out.println("Buffering callback");
-         pendingCallbacks.add(new CallbackHolder(sequence, callback));
-      }
-   }
-
    private void flushCallbacks()
    {
       while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence)
@@ -535,7 +498,6 @@
          {
             holder.callback.done();
          }
-         pendingWrites.down();
          nextReadSequence++;
       }
    }
@@ -588,8 +550,6 @@
       }
    }
 
-   // Private ---------------------------------------------------------------------------
-
    private void pollEvents()
    {
       if (!opened)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -112,7 +112,5 @@
 
    void perfBlast(int pages) throws Exception;
 
-   void sync(IOCompletion callback);
 
-
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -80,9 +80,6 @@
 
    void sync() throws Exception;
 
-   /** This method will make sure the parameter callback will be invoked after all pending sync operations are done */
-   void syncCallback(IOAsyncTask callback);
-
    long size() throws Exception;
    
    void renameTo(String newFileName) throws Exception;

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -304,14 +304,4 @@
          throw new IllegalStateException("File not opened");
       }
    }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
-    */
-   @Override
-   protected void syncCallbackDirect(IOAsyncTask callback)
-   {
-      aioFile.syncCallback(callback);
-   }
-
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -16,6 +16,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.journal.IOAsyncTask;
@@ -189,30 +190,11 @@
          write(bytes, false, DummyCallback.getInstance());
       }
    }
-   
-   /**
-    * invoke the callback after all pending operations are complete.
-    */
-   public void syncCallback(IOAsyncTask callback)
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.syncCallback(callback);
-      }
-      else
-      {
-         syncCallbackDirect(callback);
-      }
-   }
-   
 
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
-   protected abstract void syncCallbackDirect(IOAsyncTask callback);
-
    protected File getFile()
    {
       return file;
@@ -270,22 +252,15 @@
    {
       public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
       {
-         if (buffer == null)
+         buffer.flip();
+
+         if (buffer.limit() == 0)
          {
-            syncCallbackDirect(new DelegateCallback(callbacks));
+            factory.releaseBuffer(buffer);
          }
          else
          {
-            buffer.flip();
-   
-            if (buffer.limit() == 0)
-            {
-               factory.releaseBuffer(buffer);
-            }
-            else
-            {
-               writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
-            }
+            writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
          }
       }
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -864,31 +864,7 @@
          callback.waitCompletion();
       }
    }
-   
-   public void sync(IOCompletion callback)
-   {
-      callback.lineUp();
 
-      compactingLock.readLock().lock();
-
-      try
-      {
-         lockAppend.lock();
-         try
-         {
-            currentFile.getFile().syncCallback(callback);
-         }
-         finally
-         {
-            lockAppend.unlock();
-         }
-      }
-      finally
-      {
-         compactingLock.readLock().unlock();
-      }
-   }
-
    public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
    {
       if (LOAD_TRACE)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -220,18 +220,6 @@
       internalWrite(bytes, sync, null);
    }
 
-   
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
-    */
-   @Override
-   protected void syncCallbackDirect(IOAsyncTask callback)
-   {
-      // Nothing to be done on NIO.
-      // Timed buffer took care of everything
-      callback.done();
-   }
-   
    /**
     * @param bytes
     * @param sync
@@ -255,5 +243,4 @@
          callback.done();
       }
    }
-
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -225,30 +225,15 @@
          return true;
       }
    }
-   
 
-   /**
-    * This method will make sure this callback will be executed after all the pending callbacks
-    */
-   public synchronized void syncCallback(IOAsyncTask callback)
+   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
    {
-      resumeTimerIfNeeded();
-      
-      callbacks.add(callback);
-      
-      pendingSync = true;
-      
-      if (flushOnSync)
+      if (buffer.writerIndex() == 0)
       {
-         flush();
+         // Resume latch
+         latchTimer.down();
       }
-   }
 
-
-   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
-   {
-      resumeTimerIfNeeded();
-
       buffer.writeBytes(bytes);
 
       callbacks.add(callback);
@@ -274,57 +259,35 @@
       }
    }
 
-   private void resumeTimerIfNeeded()
-   {
-      if (buffer.writerIndex() == 0 && callbacks.size() == 0)
-      {
-         // Resume latch
-         latchTimer.down();
-      }
-   }
-
    public synchronized void flush()
    {
-      if (buffer.writerIndex() > 0 || callbacks.size() > 0)
+      if (buffer.writerIndex() > 0)
       {
-         // Stop latch
          latchTimer.up();
-         
-         if (buffer.writerIndex() == 0 && callbacks.size() > 0)
+
+         int pos = buffer.writerIndex();
+
+         if (logRates)
          {
-            // This is to perform a sync callback.
-            // When we get to here, means we have sync callbacks waiting with no buffer
-            // on this case we need to call sync on the file to make sure no other callbacks are pending
-            bufferObserver.flushBuffer(null, pendingSync, callbacks);
-   
-            callbacks = new LinkedList<IOAsyncTask>();
+            bytesFlushed += pos;
          }
-         else
-         {
-            int pos = buffer.writerIndex();
-   
-            if (logRates)
-            {
-               bytesFlushed += pos;
-            }
-   
-            ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-   
-            // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
-            // Using directBuffer.put(buffer) would make several append calls for each byte
-   
-            directBuffer.put(buffer.array(), 0, pos);
-   
-            bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
-   
-            callbacks = new LinkedList<IOAsyncTask>();
-   
-            active = false;
-            pendingSync = false;
-   
-            buffer.clear();
-            bufferLimit = 0;
-         }
+
+         ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
+
+         // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+         // Using directBuffer.put(buffer) would make several append calls for each byte
+
+         directBuffer.put(buffer.array(), 0, pos);
+
+         bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
+
+         callbacks = new LinkedList<IOAsyncTask>();
+
+         active = false;
+         pendingSync = false;
+
+         buffer.clear();
+         bufferLimit = 0;
       }
    }
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -161,6 +161,4 @@
 
 
    void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
-   void sync();
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -515,11 +515,6 @@
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
    }
 
-   public void sync()
-   {
-      messageJournal.sync(OperationContextImpl.getInstance());
-   }
-
    // Transactional operations
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception

Deleted: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -1,154 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-import java.util.concurrent.Executor;
-
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.persistence.OperationContext;
-
-/**
- * A SyncOperation
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class SyncOperation implements OperationContext
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-   
-   OperationContext ctx;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-   
-   public SyncOperation (OperationContext ctx)
-   {
-      this.ctx = ctx;
-   }
-
-   // Public --------------------------------------------------------
-
-   /**
-    * 
-    * @see org.hornetq.core.persistence.OperationContext#complete()
-    */
-   public void complete()
-   {
-      ctx.complete();
-   }
-
-   /**
-    * 
-    * @see org.hornetq.core.asyncio.AIOCallback#done()
-    */
-   public void done()
-   {
-      ctx.done();
-   }
-
-   /**
-    * @param runnable
-    * @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
-    */
-   public void executeOnCompletion(IOAsyncTask runnable)
-   {
-      ctx.executeOnCompletion(runnable);
-   }
-
-   /**
-    * @return
-    * @see org.hornetq.core.persistence.OperationContext#hasReplication()
-    */
-   public boolean hasReplication()
-   {
-      return ctx.hasReplication();
-   }
-
-   /**
-    * @return
-    * @see org.hornetq.core.persistence.OperationContext#isSync()
-    */
-   public boolean isSync()
-   {
-      return true;
-   }
-
-   /**
-    * 
-    * @see org.hornetq.core.journal.IOCompletion#lineUp()
-    */
-   public void lineUp()
-   {
-      ctx.lineUp();
-   }
-
-   /**
-    * @param errorCode
-    * @param errorMessage
-    * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
-    */
-   public void onError(int errorCode, String errorMessage)
-   {
-      ctx.onError(errorCode, errorMessage);
-   }
-
-   /**
-    * 
-    * @see org.hornetq.core.persistence.OperationContext#replicationDone()
-    */
-   public void replicationDone()
-   {
-      ctx.replicationDone();
-   }
-
-   /**
-    * 
-    * @see org.hornetq.core.persistence.OperationContext#replicationLineUp()
-    */
-   public void replicationLineUp()
-   {
-      ctx.replicationLineUp();
-   }
-
-   /**
-    * @see org.hornetq.core.persistence.OperationContext#setExecutor(java.util.concurrent.Executor)
-    */
-   public void setExecutor(Executor executor)
-   {
-      ctx.setExecutor(executor);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.persistence.OperationContext#reattach()
-    */
-   public void reinstall()
-   {
-      OperationContextImpl.setInstance(this);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -29,7 +29,6 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -94,7 +93,6 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -416,11 +414,6 @@
             packet = new ReplicationDeleteMessage();
             break;
          }
-         case REPLICATION_SYNC:
-         {
-            packet = new ReplicationSyncContextMessage();
-            break;
-         }
          case REPLICATION_DELETE_TX:
          {
             packet = new ReplicationDeleteTXMessage();

Modified: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -172,8 +172,6 @@
    public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
    
    public static final byte REPLICATION_COMPARE_DATA = 92;
-   
-   public static final byte REPLICATION_SYNC = 93;
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Deleted: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -1,80 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl.wireformat;
-
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-
-/**
- * Message sent when a Replication Context is complete without any persistence replicated.
- * On that case we need to go over the cluster to make sure we get the data sent at the right order.
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationSyncContextMessage extends PacketImpl
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ReplicationSyncContextMessage()
-   {
-      super(REPLICATION_SYNC);
-   }
-
-   // Public --------------------------------------------------------
-
-   @Override
-   public int getRequiredBufferSize()
-   {
-      return BASIC_PACKET_SIZE;
-
-   }
-
-   @Override
-   public void encodeBody(final HornetQBuffer buffer)
-   {
-   }
-
-   @Override
-   public void decodeBody(final HornetQBuffer buffer)
-   {
-   }
-
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -84,6 +84,4 @@
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
    
-   void sync(OperationContext ctx);
-
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -25,7 +25,6 @@
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.replication.ReplicationManager;
 
@@ -434,15 +433,6 @@
       localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.Journal#sync()
-    */
-   public void sync(IOCompletion ctx)
-   {
-      replicationManager.sync((OperationContext)ctx);
-      localJournal.sync(ctx);
-   }
-
    /**
     * @param committedRecords
     * @param preparedTransactions

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -17,7 +17,6 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -163,11 +162,6 @@
             handleCompareDataMessage((ReplicationCompareDataMessage)packet);
             response = new NullResponseMessage();
          }
-         else if (packet.getType() == REPLICATION_SYNC)
-         {
-            // https://jira.jboss.org/jira/browse/HORNETQ-218
-            // Nothing to be done, we just needed a round trip to process events in order
-         }
          else
          {
             log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -29,7 +29,6 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.persistence.impl.journal.SyncOperation;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Packet;
@@ -421,31 +420,6 @@
       replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
    }
 
-
-   public void sync(OperationContext context)
-   {
-      boolean executeNow = false;
-      synchronized (replicationLock)
-      {
-         context.replicationLineUp();
-         if (pendingTokens.isEmpty())
-         {
-            // this means the list is empty and we should process it now
-            executeNow = true;
-         }
-         else
-         {
-            // adding the sync to be executed in order
-            // as soon as the reponses are back from the backup
-            this.pendingTokens.add(new SyncOperation(context));
-         }
-      }
-      if (executeNow)
-      {
-         context.replicationDone();
-      }
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -492,7 +466,7 @@
          ctx.replicationDone();
       }
    }
-   
+
    public OperationContext getContext()
    {
       return OperationContextImpl.getInstance();

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -85,34 +85,25 @@
 
    protected static class CountDownCallback implements AIOCallback
    {
-      private final CountDownLatch latchDone;
+      private final CountDownLatch latch;
       
-      private final CountDownLatch waitCallback;
-      
       private final List<Integer> outputList;
       
       private final int order;
       
       private final AtomicInteger errors;
 
-      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order, final CountDownLatch waitCallback)
+      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
       {
-         this.latchDone = latch;
+         this.latch = latch;
          
          this.outputList = outputList;
          
          this.order = order;
          
          this.errors = errors;
-         
-         this.waitCallback = waitCallback;
       }
 
-      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
-      {
-         this(latch, errors, outputList, order, null);
-      }
-
       volatile boolean doneCalled = false;
 
       volatile int errorCalled = 0;
@@ -121,26 +112,15 @@
 
       public void done()
       {
-         if (waitCallback != null)
-         {
-            try
-            {
-               waitCallback.await();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace(); // -> junit report
-            }
-         }
          if (outputList != null)
          {
             outputList.add(order);
          }
          doneCalled = true;
          timesDoneCalled.incrementAndGet();
-         if (latchDone != null)
+         if (latch != null)
          {
-            latchDone.countDown();
+            latch.countDown();
          }
       }
 
@@ -155,11 +135,11 @@
          {
             errors.incrementAndGet();
          }
-         if (latchDone != null)
+         if (latch != null)
          {
             // even thought an error happened, we need to inform the latch,
                // or the test won't finish
-            latchDone.countDown();
+            latch.countDown();
          }
       }
       

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -22,7 +22,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.TestSuite;
@@ -421,60 +420,6 @@
       }
    }
 
-   public void testOrderOnSynCallback() throws Exception
-   {
-      boolean closed = false;
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
-      ByteBuffer buffer = null;
-      try
-      {
-         final int NUMBER_LINES = 100;
-         final int SIZE = 512;
-
-         controller.open(FILE_NAME, 100);
-
-         controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
-         CountDownLatch latch = new CountDownLatch(NUMBER_LINES * 2);
-
-         buffer = AsynchronousFileImpl.newBuffer(SIZE);
-         buffer.rewind();
-         for (int j = 0; j < SIZE; j++)
-         {
-            buffer.put((byte)(j % Byte.MAX_VALUE));
-         }
-
-         ArrayList<Integer> result = new ArrayList<Integer>();
-         
-         for (int i = 0; i < NUMBER_LINES * 2; i++)
-         {
-            CountDownCallback aio = new CountDownCallback(latch, null, result, i);
-            if (i % 2 == 0)
-            {
-               controller.write(i * SIZE, SIZE, buffer, aio);
-            }
-            else
-            {
-               controller.syncCallback(aio);
-            }
-         }
-
-         controller.close();
-         closed = true;
-
-         // We are not waiting the latch, as close should already hold for any writes
-         CountDownCallback.checkResults(NUMBER_LINES * 2, result);
-      }
-      finally
-      {
-         AsynchronousFileImpl.destroyBuffer(buffer);
-         if (!closed)
-         {
-            controller.close();
-         }
-      }
-   }
-
    public void testBufferCallbackAwaysSameBuffer() throws Exception
    {
       boolean closed = false;

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -17,16 +17,11 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.tests.util.UnitTestCase;
 
 /**
@@ -44,9 +39,9 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-
+      
       factory = createFactory();
-
+      
       factory.start();
    }
 
@@ -54,13 +49,13 @@
    protected void tearDown() throws Exception
    {
       assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
-
+      
       factory.stop();
-
+      
       factory = null;
-
+      
       forceGC();
-
+      
       super.tearDown();
    }
 
@@ -177,139 +172,7 @@
       sf2.close();
 
    }
-
-   public void testOrder() throws Exception
-   {
-      SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
-
-      sf.open();
-      
-      factory.activateBuffer(sf);
-
-      final int records = 5000;
-      
-      sf.fill(0, records * 1024, (byte)0);
-      
-
-      final ArrayList<Integer> result = new ArrayList<Integer>();
-      
-      final CountDownLatch latch = new CountDownLatch(records);
-      
-      HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
-      
-      for (int i = 0 ; i < records; i++)
-      {
-         final int toadd = i;
-         IOAsyncTask callback = new IOAsyncTask()
-         {
-            
-            public void onError(int errorCode, String errorMessage)
-            {
-            }
-            
-            public void done()
-            {
-               result.add(toadd);
-               
-               latch.countDown();
-            }
-            
-         };
-         
-         if (i % 2 == 0)
-         {
-            sf.disableAutoFlush();
-            sf.fits(512);
-            sf.write(buffer, false, callback);
-            sf.enableAutoFlush();
-         }
-         else
-         {
-            sf.syncCallback(callback);
-         }
-      }
-      
-      assertTrue(latch.await(5, TimeUnit.SECONDS));
-      
-      assertEquals(records, result.size());
-      
-      int i = 0;
-      
-      for (Integer r : result)
-      {
-         assertEquals(i++, r.intValue());
-      }
-
-      
-      factory.deactivateBuffer();
-
-      sf.close();
-   }
-
-   public void testOrder2() throws Exception
-   {
-      SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
-
-      sf.open();
-      
-      factory.activateBuffer(sf);
-
-      final int records = 1000;
-      
-      sf.fill(0, records * 1024, (byte)0);
-      
-
-      final ArrayList<Integer> result = new ArrayList<Integer>();
-      
-      final CountDownLatch latch = new CountDownLatch(records);
-      
-      HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
-      
-      for (int i = 0 ; i < records; i++)
-      {
-         final int toadd = i;
-         IOAsyncTask callback = new IOAsyncTask()
-         {
-            
-            public void onError(int errorCode, String errorMessage)
-            {
-            }
-            
-            public void done()
-            {
-               result.add(toadd);
-               
-               latch.countDown();
-            }
-            
-         };
-         
-         if (i == 10)
-         {
-            sf.write(buffer, false, callback);
-         }
-         else
-         {
-            sf.syncCallback(callback);
-         }
-      }
-      
-      assertTrue(latch.await(5, TimeUnit.SECONDS));
-      
-      assertEquals(records, result.size());
-      
-      int i = 0;
-      
-      for (Integer r : result)
-      {
-         assertEquals(i++, r.intValue());
-      }
-      
-      factory.deactivateBuffer();
-
-      sf.close();
-   }
-
+   
    public void testRename() throws Exception
    {
       SequentialFile sf = factory.createSequentialFile("test1.hq", 1);
@@ -321,7 +184,7 @@
       assertEquals(1, fileNames.size());
 
       assertTrue(fileNames.contains("test1.hq"));
-
+      
       sf.renameTo("test1.cmp");
 
       fileNames = factory.listFiles("cmp");
@@ -341,7 +204,7 @@
       assertEquals(0, fileNames.size());
 
    }
-
+   
    public void testWriteandRead() throws Exception
    {
       SequentialFile sf = factory.createSequentialFile("write.hq", 1);
@@ -359,7 +222,7 @@
       String s3 = "echidna";
       byte[] bytes3 = s3.getBytes("UTF-8");
       ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-
+      
       long initialPos = sf.position();
       sf.writeDirect(bb1, true);
       long bytesWritten = sf.position() - initialPos;
@@ -442,6 +305,7 @@
          sf.writeDirect(bb2, true);
          bytesWritten = sf.position() - initialPos;
 
+         
          assertEquals(bb2.limit(), bytesWritten);
 
          initialPos = sf.position();
@@ -518,9 +382,9 @@
 
       try
       {
-
+         
          bb1 = factory.wrapBuffer(bytes1);
-
+         
          sf.writeDirect(bb1, true);
 
          fail("Should throw exception");

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-21 23:29:50 UTC (rev 8366)
@@ -634,17 +634,10 @@
        */
       public void setTimedBuffer(TimedBuffer buffer)
       {
+         // TODO Auto-generated method stub
          
       }
 
-      /* (non-Javadoc)
-       * @see org.hornetq.core.journal.SequentialFile#syncCallback(org.hornetq.core.journal.IOAsyncTask)
-       */
-      public void syncCallback(IOAsyncTask callback)
-      {
-         callback.done();
-      }
-
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list