[hornetq-commits] JBoss hornetq SVN: r8045 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 5 15:18:07 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-05 15:18:07 -0400 (Mon, 05 Oct 2009)
New Revision: 8045

Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -48,6 +48,8 @@
    
    void afterReplicated(Runnable run);
    
+   void completeReplication();
+   
    UUID getPersistentID();
    
    void setPersistentID(UUID id) throws Exception;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -254,7 +254,7 @@
       }
       else
       {
-         this.messageJournal = localBindings;
+         this.messageJournal = localMessage;
       }
 
       
@@ -265,6 +265,17 @@
       perfBlastPages = config.getJournalPerfBlastPages();
    }
    
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#completeReplication()
+    */
+   public void completeReplication()
+   {
+      if (replicator != null)
+      {
+         replicator.completeToken();
+      }
+   }
+
    public boolean isReplicated()
    {
       return replicator != null;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -261,4 +261,11 @@
       return false;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#completeReplication()
+    */
+   public void completeReplication()
+   {
+   }
+
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -144,18 +144,38 @@
 
       tx.commit();
 
-      count++;
-
-      if (count == batchSize)
+      
+      Runnable action = new Runnable()
       {
-         // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
-         // long time in the case there are many messages in the queue
-         active = false;
-
-         executor.execute(new Prompter());
-
-         count = 0;
+         public void run()
+         {
+            
+            count++;
+      
+            if (count == batchSize)
+            {
+               // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
+               // long time in the case there are many messages in the queue
+               active = false;
+      
+               
+               executor.execute(new Prompter());
+      
+               count = 0;
+            }
+            
+         }
+      };
+      
+      if (storageManager.isReplicated())
+      {
+         storageManager.afterReplicated(action);
+         storageManager.completeReplication();
       }
+      else
+      {
+         action.run();
+      }
    }
 
    private class Prompter implements Runnable

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -755,35 +755,6 @@
       return nodeID;
    }
 
-   public void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception
-   {
-      Binding binding = postOffice.getBinding(queueName);
-
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find queue " + queueName);
-      }
-
-      Queue queue = (Queue)binding.getBindable();
-
-      MessageReference reference = queue.removeFirstReference(messageID);
-
-      Transaction tx = new TransactionImpl(storageManager);
-
-      boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
-
-      if (routed)
-      {
-         queue.acknowledge(tx, reference);
-
-         tx.commit();
-      }
-      else
-      {
-         throw new IllegalStateException("Must be routed");
-      }
-   }
-
    public Queue createQueue(final SimpleString address,
                             final SimpleString queueName,
                             final SimpleString filterString,

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -420,9 +420,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleCreateQueue(final CreateQueueMessage packet)
@@ -495,9 +493,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
@@ -533,9 +529,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
@@ -590,9 +584,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
@@ -636,9 +628,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleAcknowledge(final SessionAcknowledgeMessage packet)
@@ -673,12 +663,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      if (response != null)
-      {
-         channel.send(response);
-      }
+      sendResponse(packet, response, false, false);
    }
 
    public void handleExpired(final SessionExpiredMessage packet)
@@ -697,7 +682,8 @@
          log.error("Failed to acknowledge", e);
       }
 
-      channel.confirm(packet);
+
+      sendResponse(packet, null, false, false);
    }
 
    public void handleCommit(final Packet packet)
@@ -728,9 +714,7 @@
          tx = new TransactionImpl(storageManager);
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleRollback(final RollbackMessage packet)
@@ -757,9 +741,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXACommit(final SessionXACommitMessage packet)
@@ -820,9 +802,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXAEnd(final SessionXAEndMessage packet)
@@ -894,9 +874,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXAForget(final SessionXAForgetMessage packet)
@@ -906,9 +884,7 @@
 
       Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXAJoin(final SessionXAJoinMessage packet)
@@ -957,9 +933,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXAResume(final SessionXAResumeMessage packet)
@@ -1019,9 +993,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXARollback(final SessionXARollbackMessage packet)
@@ -1082,9 +1054,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXAStart(final SessionXAStartMessage packet)
@@ -1133,9 +1103,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXASuspend(final Packet packet)
@@ -1182,9 +1150,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleXAPrepare(final SessionXAPrepareMessage packet)
@@ -1242,43 +1208,35 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleGetInDoubtXids(final Packet packet)
    {
       Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleGetXATimeout(final Packet packet)
    {
       Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
    {
       Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleStart(final Packet packet)
    {
       setStarted(true);
 
-      channel.confirm(packet);
+      sendResponse(packet, null, false, false);
    }
 
    public void handleStop(final Packet packet)
@@ -1287,9 +1245,7 @@
 
       setStarted(false);
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleClose(final Packet packet)
@@ -1316,14 +1272,8 @@
          }
       }
 
-      channel.confirm(packet);
+      sendResponse(packet, response, true, true);
 
-      // We flush the confirmations to make sure any send confirmations get handled on the client side
-      channel.flushConfirmations();
-
-      channel.send(response);
-
-      channel.close();
    }
 
    public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
@@ -1359,9 +1309,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      channel.send(response);
+      sendResponse(packet, response, false, false);
    }
 
    public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
@@ -1383,7 +1331,8 @@
          log.error("Failed to receive credits " + this.server.getConfiguration().isBackup(), e);
       }
 
-      channel.confirm(packet);
+
+      sendResponse(packet, null, false, false);
    }
 
    public void handleSendLargeMessage(final SessionSendLargeMessage packet)
@@ -1418,7 +1367,7 @@
          log.error("Failed to send message", e);
       }
 
-      channel.confirm(packet);
+      sendResponse(packet, null, false, false);
    }
 
    public void handleSend(final SessionSendMessage packet)
@@ -1465,13 +1414,8 @@
             }
          }
       }
-
-      channel.confirm(packet);
-
-      if (response != null)
-      {
-         channel.send(response);
-      }
+      
+      sendResponse(packet, response, false, false);
    }
 
    public void handleSendContinuations(final SessionSendContinuationMessage packet)
@@ -1520,12 +1464,7 @@
          }
       }
 
-      channel.confirm(packet);
-
-      if (response != null)
-      {
-         channel.send(response);
-      }
+      sendResponse(packet, response, false, false);
    }
 
    public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
@@ -1638,6 +1577,50 @@
    // Private
    // ----------------------------------------------------------------------------
 
+   /**
+    * Respond to client after replication
+    * @param packet
+    * @param response
+    */
+   private void sendResponse(final Packet confirmPacket, final Packet response, final boolean flush, final boolean closeChannel)
+   {
+      Runnable action = new Runnable()
+      {
+         public void run()
+         {
+            if (confirmPacket != null)
+            {
+               channel.confirm(confirmPacket);
+               if (flush)
+               {
+                  channel.flushConfirmations();
+               }
+            }
+
+            if (response != null)
+            {
+               channel.send(response);
+            }
+            
+            if (closeChannel)
+            {
+               channel.close();
+            }
+         }
+      };
+      
+      if (storageManager.isReplicated())
+      {
+         storageManager.afterReplicated(action);
+         storageManager.completeReplication();
+      }
+      else
+      {
+         action.run();
+      }
+   }
+
+
    private void setStarted(final boolean s)
    {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-05 19:18:07 UTC (rev 8045)
@@ -201,7 +201,61 @@
          server.stop();
       }
    }
+   
+   public void testNoActions() throws Exception
+   {
 
+      Configuration config = createDefaultConfig(false);
+
+      config.setBackup(true);
+
+      HornetQServer server = new HornetQServerImpl(config);
+
+      server.start();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+         manager.start();
+
+         Journal replicatedJournal = new ReplicatedJournalImpl((byte)1, new FakeJournal(), manager);
+
+         replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
+
+         final CountDownLatch latch = new CountDownLatch(1);
+         manager.afterReplicated(new Runnable()
+         {
+
+            public void run()
+            {
+               latch.countDown();
+            }
+
+         });
+         assertTrue(latch.await(1, TimeUnit.SECONDS));
+         assertEquals(1, manager.getActiveTokens().size());
+
+         manager.completeToken();
+
+         for (int i = 0; i < 100; i++)
+         {
+            // This is asynchronous. Have to wait completion
+            if (manager.getActiveTokens().size() == 0)
+            {
+               break;
+            }
+            Thread.sleep(1);
+         }
+
+         assertEquals(0, manager.getActiveTokens().size());
+         manager.stop();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
    class FakeData implements EncodingSupport
    {
 



More information about the hornetq-commits mailing list