[hornetq-commits] JBoss hornetq SVN: r8055 - in branches/Replication_Clebert: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 6 15:44:54 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-06 15:44:53 -0400 (Tue, 06 Oct 2009)
New Revision: 8055

Added:
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java
Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Changes & fixed

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java	2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -22,13 +22,15 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.replication.ReplicationManager;
 
 /**
- * A ReplicatedJournalImpl
+ * Used by the {@link JournalStorageManager} to replicate journal calls. 
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
+ * @see JournalStorageManager
  *
  */
 public class ReplicatedJournalImpl implements Journal
@@ -44,11 +46,6 @@
 
    private final byte journalID;
 
-   /**
-    * @param journaID
-    * @param replicatedJournal
-    * @param replicationManager
-    */
    public ReplicatedJournalImpl(byte journaID, Journal replicatedJournal, ReplicationManager replicationManager)
    {
       super();
@@ -72,8 +69,7 @@
     */
    public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
    {
-      replicationManager.appendAddRecord(journalID, id, recordType, new ByteArrayEncoding(record));
-      replicatedJournal.appendAddRecord(id, recordType, record, sync);
+      this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    /**
@@ -86,6 +82,7 @@
     */
    public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
    {
+      System.out.println("Append record id = " + id + " recordType = " + recordType);
       replicationManager.appendAddRecord(journalID, id, recordType, record);
       replicatedJournal.appendAddRecord(id, recordType, record, sync);
    }
@@ -100,8 +97,7 @@
     */
    public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
    {
-      replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, new ByteArrayEncoding(record));
-      replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+      this.appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
    }
 
    /**
@@ -114,6 +110,7 @@
     */
    public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
    {
+      System.out.println("Append record TXid = " + id + " recordType = " + recordType);
       replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
       replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
    }
@@ -126,6 +123,7 @@
     */
    public void appendCommitRecord(long txID, boolean sync) throws Exception
    {
+      System.out.println("AppendCommit " + txID);
       replicationManager.appendCommitRecord(journalID, txID);
       replicatedJournal.appendCommitRecord(txID, sync);
    }
@@ -138,9 +136,10 @@
     */
    public void appendDeleteRecord(long id, boolean sync) throws Exception
    {
+      System.out.println("AppendDelete " + id);
       replicationManager.appendDeleteRecord(journalID, id);
       replicatedJournal.appendDeleteRecord(id, sync);
-   }
+    }
 
    /**
     * @param txID
@@ -151,8 +150,7 @@
     */
    public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
    {
-      replicationManager.appendDeleteRecordTransactional(journalID, txID, id, new ByteArrayEncoding(record));
-      replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+      this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
    }
 
    /**
@@ -164,6 +162,7 @@
     */
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
    {
+      System.out.println("AppendDelete txID=" + txID + " id=" + id);
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
       replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
    }
@@ -176,6 +175,7 @@
     */
    public void appendDeleteRecordTransactional(long txID, long id) throws Exception
    {
+      System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
       replicatedJournal.appendDeleteRecordTransactional(txID, id);
    }
@@ -189,8 +189,7 @@
     */
    public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
    {
-      replicationManager.appendPrepareRecord(journalID, txID, new ByteArrayEncoding(transactionData));
-      replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+      this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
    }
 
    /**
@@ -202,6 +201,7 @@
     */
    public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
    {
+      System.out.println("AppendPrepare txID=" + txID);
       replicationManager.appendPrepareRecord(journalID, txID, transactionData);
       replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
    }
@@ -214,6 +214,7 @@
     */
    public void appendRollbackRecord(long txID, boolean sync) throws Exception
    {
+      System.out.println("AppendRollback " + txID);
       replicationManager.appendRollbackRecord(journalID, txID);
       replicatedJournal.appendRollbackRecord(txID, sync);
    }
@@ -228,8 +229,7 @@
     */
    public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
    {
-      replicationManager.appendUpdateRecord(journalID, id, recordType, new ByteArrayEncoding(record));
-      replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+      this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    /**
@@ -242,6 +242,7 @@
     */
    public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
    {
+      System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
       replicationManager.appendUpdateRecord(journalID, id, recordType, record);
       replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
    }
@@ -256,8 +257,7 @@
     */
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
    {
-      replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, new ByteArrayEncoding(record));
-      replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+      this.appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
    }
 
    /**
@@ -270,6 +270,7 @@
     */
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
    {
+      System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
       replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
       replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
    }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -55,6 +55,8 @@
    private Journal messagingJournal;
 
    private JournalStorageManager storage;
+   
+   private volatile boolean started;
 
    // Static --------------------------------------------------------
 
@@ -130,6 +132,8 @@
 
       // We only need to load internal structures on the backup...
       storage.loadInternalOnly();
+
+      started = true;
    }
 
    /* (non-Javadoc)
@@ -137,6 +141,8 @@
     */
    public void stop() throws Exception
    {
+      started = false;
+      channel.close();
       storage.stop();
    }
 
@@ -161,6 +167,7 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
+
    /**
     * @param packet
     */
@@ -169,8 +176,8 @@
       ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
 
       Journal journalToUse = getJournal(commitMessage.getJournalID());
-
       
+      
       if (commitMessage.isRollback())
       {
          journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
@@ -256,6 +263,7 @@
 
       if (addMessage.isUpdate())
       {
+         System.out.println("Endpoint appendUpdate id = "  + addMessage.getId());
          journalToUse.appendUpdateRecord(addMessage.getId(),
                                          addMessage.getRecordType(),
                                          addMessage.getRecordData(),
@@ -263,6 +271,7 @@
       }
       else
       {
+         System.out.println("Endpoint append id = "  + addMessage.getId());
          journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
       }
    }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -46,7 +46,6 @@
    public synchronized void linedUp()
    {
       pendings++;
-      System.out.println("pendings (lined up) = " + pendings);
    }
 
    /** To be called by the replication manager, when data is confirmed on the channel */
@@ -68,7 +67,6 @@
    /** You may have several actions to be done after a replication operation is completed. */
    public synchronized void addReplicationAction(Runnable runnable)
    {
-      System.out.println("pendings on addFutureCompletion = " + pendings);
       if (pendings == 0)
       {
          executor.execute(runnable);

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -126,6 +126,7 @@
    
             sf.setBlockOnNonPersistentSend(true);
             sf.setBlockOnPersistentSend(true);
+            sf.setBlockOnAcknowledge(true);
    
             ClientSession createSession = sf.createSession(true, true);
    
@@ -245,6 +246,8 @@
             {
                break;
             }
+            
+            System.out.println("Thread " + Thread.currentThread().getName() + " received " + message.getMessageID());
 
             // There may be some missing or duplicate messages - but the order should be correct
 
@@ -254,6 +257,7 @@
 
             lastCount = count;
 
+            System.out.println("Client ACK: " + message.getMessageID());
             message.acknowledge();
          }
 

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -193,7 +193,7 @@
 
          producer.send(message);
       }
-
+      
       RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
 
       // Simulate failure on connection

Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java	                        (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.config.Configuration;
+
+/**
+ * A NettyReplicatedFailoverTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyReplicatedFailoverTest extends NettyFailoverTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   
+   @Override
+   protected void createConfigs() throws Exception
+   {
+      Configuration config1 = super.createDefaultConfig();
+      config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
+      config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
+      config1.getAcceptorConfigurations().clear();
+      config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+      config1.setSecurityEnabled(false);
+      config1.setSharedStore(false);
+      config1.setBackup(true);
+      server1Service = super.createServer(true, config1);
+
+      Configuration config0 = super.createDefaultConfig();
+      config0.getAcceptorConfigurations().clear();
+      config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+
+      config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
+      config0.setBackupConnectorName("toBackup");
+      config0.setSecurityEnabled(false);
+      config0.setSharedStore(false);
+      server0Service = super.createServer(true, config0);
+
+      server1Service.start();
+      server0Service.start();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java	                        (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java	2009-10-06 19:44:53 UTC (rev 8055)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.config.Configuration;
+
+/**
+ * A ReplicatedAsynchronousFailoverTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedAsynchronousFailoverTest extends AsynchronousFailoverTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   @Override
+   protected void createConfigs() throws Exception
+   {
+      Configuration config1 = super.createDefaultConfig();
+      config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
+      config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
+      config1.getAcceptorConfigurations().clear();
+      config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+      config1.setSecurityEnabled(false);
+      config1.setSharedStore(false);
+      config1.setBackup(true);
+      server1Service = super.createServer(true, config1);
+
+      Configuration config0 = super.createDefaultConfig();
+      config0.getAcceptorConfigurations().clear();
+      config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+
+      config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
+      config0.setBackupConnectorName("toBackup");
+      config0.setSecurityEnabled(false);
+      config0.setSharedStore(false);
+      server0Service = super.createServer(true, config0);
+
+      server1Service.start();
+      server0Service.start();
+   }
+
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list