[hornetq-commits] JBoss hornetq SVN: r8055 - in branches/Replication_Clebert: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 6 15:44:54 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-06 15:44:53 -0400 (Tue, 06 Oct 2009)
New Revision: 8055
Added:
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Changes & fixed
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -22,13 +22,15 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
/**
- * A ReplicatedJournalImpl
+ * Used by the {@link JournalStorageManager} to replicate journal calls.
*
* @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
+ * @see JournalStorageManager
*
*/
public class ReplicatedJournalImpl implements Journal
@@ -44,11 +46,6 @@
private final byte journalID;
- /**
- * @param journaID
- * @param replicatedJournal
- * @param replicationManager
- */
public ReplicatedJournalImpl(byte journaID, Journal replicatedJournal, ReplicationManager replicationManager)
{
super();
@@ -72,8 +69,7 @@
*/
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
{
- replicationManager.appendAddRecord(journalID, id, recordType, new ByteArrayEncoding(record));
- replicatedJournal.appendAddRecord(id, recordType, record, sync);
+ this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
/**
@@ -86,6 +82,7 @@
*/
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
{
+ System.out.println("Append record id = " + id + " recordType = " + recordType);
replicationManager.appendAddRecord(journalID, id, recordType, record);
replicatedJournal.appendAddRecord(id, recordType, record, sync);
}
@@ -100,8 +97,7 @@
*/
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
{
- replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, new ByteArrayEncoding(record));
- replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ this.appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
/**
@@ -114,6 +110,7 @@
*/
public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
{
+ System.out.println("Append record TXid = " + id + " recordType = " + recordType);
replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
}
@@ -126,6 +123,7 @@
*/
public void appendCommitRecord(long txID, boolean sync) throws Exception
{
+ System.out.println("AppendCommit " + txID);
replicationManager.appendCommitRecord(journalID, txID);
replicatedJournal.appendCommitRecord(txID, sync);
}
@@ -138,9 +136,10 @@
*/
public void appendDeleteRecord(long id, boolean sync) throws Exception
{
+ System.out.println("AppendDelete " + id);
replicationManager.appendDeleteRecord(journalID, id);
replicatedJournal.appendDeleteRecord(id, sync);
- }
+ }
/**
* @param txID
@@ -151,8 +150,7 @@
*/
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
{
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id, new ByteArrayEncoding(record));
- replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+ this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
}
/**
@@ -164,6 +162,7 @@
*/
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
{
+ System.out.println("AppendDelete txID=" + txID + " id=" + id);
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
}
@@ -176,6 +175,7 @@
*/
public void appendDeleteRecordTransactional(long txID, long id) throws Exception
{
+ System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
replicatedJournal.appendDeleteRecordTransactional(txID, id);
}
@@ -189,8 +189,7 @@
*/
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
{
- replicationManager.appendPrepareRecord(journalID, txID, new ByteArrayEncoding(transactionData));
- replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+ this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
/**
@@ -202,6 +201,7 @@
*/
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
{
+ System.out.println("AppendPrepare txID=" + txID);
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
}
@@ -214,6 +214,7 @@
*/
public void appendRollbackRecord(long txID, boolean sync) throws Exception
{
+ System.out.println("AppendRollback " + txID);
replicationManager.appendRollbackRecord(journalID, txID);
replicatedJournal.appendRollbackRecord(txID, sync);
}
@@ -228,8 +229,7 @@
*/
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
{
- replicationManager.appendUpdateRecord(journalID, id, recordType, new ByteArrayEncoding(record));
- replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+ this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
/**
@@ -242,6 +242,7 @@
*/
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
{
+ System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
}
@@ -256,8 +257,7 @@
*/
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
{
- replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, new ByteArrayEncoding(record));
- replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ this.appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
/**
@@ -270,6 +270,7 @@
*/
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
{
+ System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -55,6 +55,8 @@
private Journal messagingJournal;
private JournalStorageManager storage;
+
+ private volatile boolean started;
// Static --------------------------------------------------------
@@ -130,6 +132,8 @@
// We only need to load internal structures on the backup...
storage.loadInternalOnly();
+
+ started = true;
}
/* (non-Javadoc)
@@ -137,6 +141,8 @@
*/
public void stop() throws Exception
{
+ started = false;
+ channel.close();
storage.stop();
}
@@ -161,6 +167,7 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+
/**
* @param packet
*/
@@ -169,8 +176,8 @@
ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
Journal journalToUse = getJournal(commitMessage.getJournalID());
-
+
if (commitMessage.isRollback())
{
journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
@@ -256,6 +263,7 @@
if (addMessage.isUpdate())
{
+ System.out.println("Endpoint appendUpdate id = " + addMessage.getId());
journalToUse.appendUpdateRecord(addMessage.getId(),
addMessage.getRecordType(),
addMessage.getRecordData(),
@@ -263,6 +271,7 @@
}
else
{
+ System.out.println("Endpoint append id = " + addMessage.getId());
journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
}
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -46,7 +46,6 @@
public synchronized void linedUp()
{
pendings++;
- System.out.println("pendings (lined up) = " + pendings);
}
/** To be called by the replication manager, when data is confirmed on the channel */
@@ -68,7 +67,6 @@
/** You may have several actions to be done after a replication operation is completed. */
public synchronized void addReplicationAction(Runnable runnable)
{
- System.out.println("pendings on addFutureCompletion = " + pendings);
if (pendings == 0)
{
executor.execute(runnable);
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -126,6 +126,7 @@
sf.setBlockOnNonPersistentSend(true);
sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
ClientSession createSession = sf.createSession(true, true);
@@ -245,6 +246,8 @@
{
break;
}
+
+ System.out.println("Thread " + Thread.currentThread().getName() + " received " + message.getMessageID());
// There may be some missing or duplicate messages - but the order should be correct
@@ -254,6 +257,7 @@
lastCount = count;
+ System.out.println("Client ACK: " + message.getMessageID());
message.acknowledge();
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-06 14:06:10 UTC (rev 8054)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -193,7 +193,7 @@
producer.send(message);
}
-
+
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
// Simulate failure on connection
Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.config.Configuration;
+
+/**
+ * A NettyReplicatedFailoverTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyReplicatedFailoverTest extends NettyFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ Configuration config1 = super.createDefaultConfig();
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
+ config1.getAcceptorConfigurations().clear();
+ config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ config1.setSecurityEnabled(false);
+ config1.setSharedStore(false);
+ config1.setBackup(true);
+ server1Service = super.createServer(true, config1);
+
+ Configuration config0 = super.createDefaultConfig();
+ config0.getAcceptorConfigurations().clear();
+ config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+
+ config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
+ config0.setBackupConnectorName("toBackup");
+ config0.setSecurityEnabled(false);
+ config0.setSharedStore(false);
+ server0Service = super.createServer(true, config0);
+
+ server1Service.start();
+ server0Service.start();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java 2009-10-06 19:44:53 UTC (rev 8055)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.config.Configuration;
+
+/**
+ * A ReplicatedAsynchronousFailoverTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedAsynchronousFailoverTest extends AsynchronousFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ Configuration config1 = super.createDefaultConfig();
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
+ config1.getAcceptorConfigurations().clear();
+ config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ config1.setSecurityEnabled(false);
+ config1.setSharedStore(false);
+ config1.setBackup(true);
+ server1Service = super.createServer(true, config1);
+
+ Configuration config0 = super.createDefaultConfig();
+ config0.getAcceptorConfigurations().clear();
+ config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+
+ config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
+ config0.setBackupConnectorName("toBackup");
+ config0.setSecurityEnabled(false);
+ config0.setSharedStore(false);
+ server0Service = super.createServer(true, config0);
+
+ server1Service.start();
+ server0Service.start();
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the hornetq-commits
mailing list