[hornetq-commits] JBoss hornetq SVN: r8300 - in branches/ClebertTemporary: src/main/org/hornetq/core/persistence/impl/journal and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 17 14:40:59 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-17 14:40:59 -0500 (Tue, 17 Nov 2009)
New Revision: 8300

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
My refactoring starting point

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -147,6 +147,4 @@
 
 
    void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
-   void sync();
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -498,14 +498,6 @@
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
    }
 
-   public void sync()
-   {
-      if (replicator != null)
-      {
-         replicator.sync();
-      }
-   }
-
    // Transactional operations
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -914,13 +914,6 @@
                }
             }
          }
-         else
-         {
-            if (storageManager.isReplicated())
-            {
-               storageManager.sync();
-            }
-         }
 
          message.incrementRefCount(reference);
       }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -243,6 +243,8 @@
 
       log.warn("Connection failure has been detected: " + me.getMessage() + " [code=" + me.getCode() + "]");
 
+      System.out.println("Fail on RemotingConnectio");
+      
       // Then call the listeners
       callFailureListeners(me);
 
@@ -399,6 +401,7 @@
 
       for (final FailureListener listener : listenersClone)
       {
+         System.out.println("Calling failure listener: " + listener.getClass().getName());
          try
          {
             listener.connectionFailed(me);
@@ -419,6 +422,8 @@
 
       for (final CloseListener listener : listenersClone)
       {
+         System.out.println("Calling listener -> " + listener);
+         System.out.println("Calling listener " + listener.getClass().getName());
          try
          {
             listener.connectionClosed();

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -26,6 +26,8 @@
 {
    /** To be called by the replication manager, when new replication is added to the queue */
    void linedUp();
+   
+   boolean hasReplication();
 
    /** To be called by the replication manager, when data is confirmed on the channel */
    void replicated();
@@ -37,5 +39,9 @@
    
    /** Flush all pending callbacks on the Context */
    void flush();
+   
+   boolean isSync();
+   
+   void setSync(boolean sync);
 
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -88,6 +88,4 @@
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
 
-   void sync();
-
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -29,11 +29,15 @@
 public class ReplicationContextImpl implements ReplicationContext
 {
    private List<Runnable> tasks;
-   
-   private AtomicInteger pendings = new AtomicInteger(0);
-   
+
+   private int linedup = 0;
+
+   private int replicated = 0;
+
+   private boolean sync = false;
+
    private volatile boolean complete = false;
-   
+
    /**
     * @param executor
     */
@@ -45,9 +49,14 @@
    /** To be called by the replication manager, when new replication is added to the queue */
    public void linedUp()
    {
-      pendings.incrementAndGet();
+      linedup++;
    }
 
+   public boolean hasReplication()
+   {
+      return linedup > 0;
+   }
+
    /** You may have several actions to be done after a replication operation is completed. */
    public void addReplicationAction(Runnable runnable)
    {
@@ -63,32 +72,32 @@
          // We don't add any more Runnables after it is complete
          tasks = new ArrayList<Runnable>();
       }
-      
+
       tasks.add(runnable);
    }
 
    /** To be called by the replication manager, when data is confirmed on the channel */
    public synchronized void replicated()
    {
-      if (pendings.decrementAndGet() == 0 && complete)
+      // roundtrip packets won't have lined up packets
+      if (++replicated == linedup && complete)
       {
          flush();
       }
    }
 
-
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationToken#complete()
     */
    public synchronized void complete()
    {
       complete = true;
-      if (pendings.get() == 0 && complete)
+      if (replicated == linedup && complete)
       {
          flush();
       }
-  }
-   
+   }
+
    public synchronized void flush()
    {
       if (tasks != null)
@@ -100,6 +109,18 @@
          tasks.clear();
       }
    }
-   
-   
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+    */
+   public boolean isSync()
+   {
+      return sync;
+   }
+
+   public void setSync(final boolean sync)
+   {
+      this.sync = sync;
+   }
+
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.replication.impl;
 
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,7 +36,6 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -84,8 +85,6 @@
 
    private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
 
-   private final ConcurrentHashSet<ReplicationContext> activeContexts = new ConcurrentHashSet<ReplicationContext>();
- 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -278,14 +277,6 @@
          sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
       }
    }
-   
-   public void sync()
-   {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationSyncContextMessage());
-      }
-   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
@@ -351,9 +342,9 @@
                log.warn(e.getMessage(), e);
             }
          }
-         
+
          public void beforeReconnect(HornetQException me)
-         {            
+         {
          }
       });
 
@@ -369,14 +360,22 @@
    {
       enabled = false;
       
+      LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+      
+      // The same context will be replicated on the pending tokens...
+      // as the multiple operations will be replicated on the same context
+      while (!pendingTokens.isEmpty())
+      {
+         ReplicationContext ctx = pendingTokens.poll();
+         activeContexts.add(ctx);
+      }
+
       for (ReplicationContext ctx : activeContexts)
       {
          ctx.complete();
          ctx.flush();
       }
-      
-      activeContexts.clear();
-      
+
       if (replicatingChannel != null)
       {
          replicatingChannel.close();
@@ -400,7 +399,6 @@
       if (token == null)
       {
          token = new ReplicationContextImpl();
-         activeContexts.add(token);
          tlReplicationContext.set(token);
       }
       return token;
@@ -420,29 +418,38 @@
    public void closeContext()
    {
       final ReplicationContext token = tlReplicationContext.get();
-      
+
       if (token != null)
       {
          // Disassociate thread local
          tlReplicationContext.set(null);
          // Remove from pending tokens as soon as this is complete
-         token.addReplicationAction(new Runnable()
+         if (!token.hasReplication())
          {
-            public void run()
-            {
-               activeContexts.remove(token);
-            }
-         });
+            sync(token);
+         }
          token.complete();
       }
    }
 
-   /* (non-Javadoc)
+   /* method for testcases only
     * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
     */
    public Set<ReplicationContext> getActiveTokens()
    {
+      
+      LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+      
+      // The same context will be replicated on the pending tokens...
+      // as the multiple operations will be replicated on the same context
+      
+      for (ReplicationContext ctx : pendingTokens)
+      {
+         activeContexts.add(ctx);
+      }
+      
       return activeContexts;
+
    }
 
    private void sendReplicatePacket(final Packet packet)
@@ -475,7 +482,7 @@
          repliToken.replicated();
       }
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
     */
@@ -484,18 +491,14 @@
       replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
    }
 
-
    private void replicated()
    {
-      ReplicationContext tokenPolled = pendingTokens.poll();
-      if (tokenPolled == null)
+      ArrayList<ReplicationContext> tokensToExecute = getTokens();
+
+      for (ReplicationContext ctx : tokensToExecute)
       {
-         throw new IllegalStateException("Missing replication token on the queue.");
+         ctx.replicated();
       }
-      else
-      {
-         tokenPolled.replicated();
-      }
    }
 
    // Package protected ---------------------------------------------
@@ -504,6 +507,80 @@
 
    // Private -------------------------------------------------------
 
+   private void sync(ReplicationContext context)
+   {
+      boolean executeNow = false;
+      synchronized (replicationLock)
+      {
+         context.linedUp();
+         context.setSync(true);
+         if (pendingTokens.isEmpty())
+         {
+            // this means the list is empty and we should process it now
+            executeNow = true;
+         }
+         else
+         {
+            // adding the sync to be executed in order
+            // as soon as the reponses are back from the backup
+            this.pendingTokens.add(context);
+         }
+      }
+      if (executeNow)
+      {
+         context.replicated();
+      }
+   }
+
+
+   /**
+    * This method will first get all the sync tokens (that won't go to the backup node)
+    * Then it will get the round trip tokens.
+    * At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
+    * @return
+    */
+   private ArrayList<ReplicationContext> getTokens()
+   {
+      ArrayList<ReplicationContext> retList = new ArrayList<ReplicationContext>(1);
+
+      ReplicationContext tokenPolled = null;
+
+      // First will get all the non replicated tokens up to the first one that is not replicated
+      do
+      {
+         tokenPolled = pendingTokens.poll();
+
+         if (tokenPolled == null)
+         {
+            throw new IllegalStateException("Missing replication token on the queue.");
+         }
+
+         retList.add(tokenPolled);
+
+      }
+      while (tokenPolled.isSync());
+
+      // This is to avoid a situation where we won't have more replicated packets
+      // all the packets will need to be processed
+      synchronized (replicationLock)
+      {
+         while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+         {
+            tokenPolled = pendingTokens.poll();
+            if (!tokenPolled.isSync())
+            {
+               throw new IllegalStateException("Replicatoin context is not a roundtrip token as expected");
+            }
+
+            retList.add(tokenPolled);
+
+         }
+      }
+
+      return retList;
+   }
+
+
    // Inner classes -------------------------------------------------
 
    protected class ResponseHandler implements ChannelHandler

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-17 19:40:59 UTC (rev 8300)
@@ -539,10 +539,6 @@
             {
                replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
             }
-            else
-            {
-               manager.sync();
-            }
 
 
             manager.afterReplicated(new Runnable()



More information about the hornetq-commits mailing list