Author: clebert.suconic(a)jboss.com
Date: 2009-11-03 11:55:02 -0500 (Tue, 03 Nov 2009)
New Revision: 8195
Added:
trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-125 - Replication work
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2009-11-03 15:02:43 UTC (rev
8194)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2009-11-03 16:55:02 UTC (rev
8195)
@@ -15,7 +15,6 @@
import java.util.List;
-import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -77,12 +76,19 @@
// Load
- long load(LoaderCallback reloadManager) throws Exception;
+ JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
+ /** Load internal data structures and not expose any data.
+ * This is only useful if you're using the journal but not interested on the
current data.
+ * Useful in situations where the journal is being replicated, copied... etc. */
+ JournalLoadInformation loadInternalOnly() throws Exception;
- long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo>
preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
+ JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback
transactionFailure) throws Exception;
+
int getAlignment() throws Exception;
+
+ int getNumberOfRecords();
void perfBlast(int pages) throws Exception;
Added: trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+/**
+ * This is a POJO containing information about the journal during load time.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalLoadInformation
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private int numberOfRecords = 0;
+
+ private long maxID = -1;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JournalLoadInformation()
+ {
+ super();
+ }
+
+ /**
+ * @param numberOfRecords
+ * @param maxID
+ */
+ public JournalLoadInformation(final int numberOfRecords, final long maxID)
+ {
+ super();
+ this.numberOfRecords = numberOfRecords;
+ this.maxID = maxID;
+ }
+
+
+
+
+ // Public --------------------------------------------------------
+
+
+ /**
+ * @return the numberOfRecords
+ */
+ public int getNumberOfRecords()
+ {
+ return numberOfRecords;
+ }
+
+ /**
+ * @param numberOfRecords the numberOfRecords to set
+ */
+ public void setNumberOfRecords(final int numberOfRecords)
+ {
+ this.numberOfRecords = numberOfRecords;
+ }
+
+ /**
+ * @return the maxID
+ */
+ public long getMaxID()
+ {
+ return maxID;
+ }
+
+ /**
+ * @param maxID the maxID to set
+ */
+ public void setMaxID(final long maxID)
+ {
+ this.maxID = maxID;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int)(maxID ^ (maxID >>> 32));
+ result = prime * result + numberOfRecords;
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JournalLoadInformation other = (JournalLoadInformation)obj;
+ if (maxID != other.maxID)
+ return false;
+ if (numberOfRecords != other.numberOfRecords)
+ return false;
+ return true;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "JournalLoadInformation [maxID=" + maxID + ",
numberOfRecords=" + numberOfRecords + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-03 15:02:43 UTC
(rev 8194)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-03 16:55:02 UTC
(rev 8195)
@@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -44,6 +45,7 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
@@ -1163,7 +1165,7 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
*/
@@ -1172,8 +1174,6 @@
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
-
-
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -1360,19 +1360,48 @@
return fileFactory.getAlignment();
}
+ public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ LoaderCallback dummyLoader = new LoaderCallback()
+ {
+
+ public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ };
+
+ return this.load(dummyLoader);
+ }
+
/**
* @see JournalImpl#load(LoaderCallback)
*/
- public synchronized long load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo>
preparedTransactions,
- final TransactionFailureCallback failureCallback) throws
Exception
+ public synchronized JournalLoadInformation load(final List<RecordInfo>
committedRecords,
+ final List<PreparedTransactionInfo>
preparedTransactions,
+ final TransactionFailureCallback
failureCallback) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
final List<RecordInfo> records = new ArrayList<RecordInfo>();
final int DELETE_FLUSH = 20000;
- long maxID = load(new LoaderCallback()
+ JournalLoadInformation info = load(new LoaderCallback()
{
public void addPreparedTransaction(final PreparedTransactionInfo
preparedTransaction)
{
@@ -1429,7 +1458,7 @@
}
}
- return maxID;
+ return info;
}
/**
@@ -1649,7 +1678,7 @@
* <p> * FileID and NumberOfElements are the transaction summary, and they will
be repeated (N)umberOfFiles times </p>
*
* */
- public synchronized long load(final LoaderCallback loadManager) throws Exception
+ public synchronized JournalLoadInformation load(final LoaderCallback loadManager)
throws Exception
{
if (state != STATE_STARTED)
{
@@ -1676,7 +1705,8 @@
int lastDataPos = SIZE_HEADER;
- long maxID = -1;
+ final AtomicLong maxID = new AtomicLong(-1);
+ // long maxID = -1;
for (final JournalFile file : orderedFiles)
{
@@ -1687,12 +1717,23 @@
int resultLastPost = readJournalFile(fileFactory, file, new
JournalReaderCallback()
{
+ private void checkID(final long id)
+ {
+ if (id > maxID.longValue())
+ {
+ maxID.set(id);
+ }
+ }
+
public void onReadAddRecord(final RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
trace("AddRecord: " + info);
}
+
+ checkID(info.id);
+
hasData.set(true);
loadManager.addRecord(info);
@@ -1706,6 +1747,9 @@
{
trace("UpdateRecord: " + info);
}
+
+ checkID(info.id);
+
hasData.set(true);
loadManager.updateRecord(info);
@@ -1753,6 +1797,8 @@
trace((info.isUpdate ? "updateRecordTX: " :
"addRecordTX: ") + info + ", txid = " + transactionID);
}
+ checkID(info.id);
+
hasData.set(true);
TransactionHolder tx = loadTransactions.get(transactionID);
@@ -2034,11 +2080,21 @@
// Remove the transactionInfo
transactions.remove(transaction.transactionID);
-
- loadManager.failedTransaction(transaction.transactionID,
transaction.recordInfos, transaction.recordsToDelete);
+
+ loadManager.failedTransaction(transaction.transactionID,
+ transaction.recordInfos,
+ transaction.recordsToDelete);
}
else
{
+ for (RecordInfo info : transaction.recordInfos)
+ {
+ if (info.id > maxID.get())
+ {
+ maxID.set(info.id);
+ }
+ }
+
PreparedTransactionInfo info = new
PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.records.addAll(transaction.recordInfos);
@@ -2053,7 +2109,7 @@
checkReclaimStatus();
- return maxID;
+ return new JournalLoadInformation(records.size(), maxID.longValue());
}
/**
@@ -2531,6 +2587,11 @@
}
}
+ public int getNumberOfRecords()
+ {
+ return this.records.size();
+ }
+
// Public
// -----------------------------------------------------------------------------
@@ -2854,7 +2915,7 @@
currentFile.getFile().write(bb, sync);
}
- return currentFile;
+ return currentFile;
}
finally
{
@@ -3355,7 +3416,7 @@
{
private static NullEncoding instance = new NullEncoding();
-
+
public static NullEncoding getInstance()
{
return instance;
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-03 15:02:43
UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-03 16:55:02
UTC (rev 8195)
@@ -40,7 +40,6 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerProducerCreditManager;
import org.hornetq.core.server.impl.ServerProducerCreditManagerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -971,6 +970,9 @@
}
depageTransaction.commit();
+
+ // StorageManager does the check: if (replicated) -> do the proper cleanup
already
+ storageManager.completeReplication();
if (isTrace)
{
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-03 15:02:43
UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-03 16:55:02
UTC (rev 8195)
@@ -18,6 +18,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
@@ -119,10 +120,10 @@
/** This method is only useful at the backup side. We only load internal structures
making the journals ready for
* append mode on the backup side. */
- void loadInternalOnly() throws Exception;
+ JournalLoadInformation[] loadInternalOnly() throws Exception;
- public void loadMessageJournal(final PostOffice postOffice,
+ JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
@@ -139,7 +140,7 @@
void deleteQueueBinding(long queueBindingID) throws Exception;
- void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception;
+ JournalLoadInformation loadBindingJournal(List<QueueBindingInfo>
queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception;
//grouping relateed operations
void addGrouping(GroupBinding groupBinding) throws Exception;
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -38,7 +38,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
@@ -678,7 +678,7 @@
}
- public void loadMessageJournal(final PostOffice postOffice,
+ public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
@@ -690,7 +690,7 @@
Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
- messageJournal.load(records, preparedTransactions, new
LargeMessageTXFailureCallback(messages));
+ JournalLoadInformation info = messageJournal.load(records, preparedTransactions,
new LargeMessageTXFailureCallback(messages));
ArrayList<LargeServerMessage> largeMessages = new
ArrayList<LargeServerMessage>();
@@ -919,6 +919,8 @@
{
messageJournal.perfBlast(perfBlastPages);
}
+
+ return info;
}
/**
@@ -1189,13 +1191,13 @@
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception
+ public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
List<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
- bindingsJournal.load(records, preparedTransactions, null);
+ JournalLoadInformation bindingsInfo = bindingsJournal.load(records,
preparedTransactions, null);
for (RecordInfo record : records)
{
@@ -1239,6 +1241,8 @@
throw new IllegalStateException("Invalid record type " + rec);
}
}
+
+ return bindingsInfo;
}
// HornetQComponent implementation
@@ -1296,34 +1300,13 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
*/
- public void loadInternalOnly() throws Exception
+ public JournalLoadInformation[] loadInternalOnly() throws Exception
{
- LoaderCallback dummyLoader = new LoaderCallback()
- {
-
- public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
- {
- }
-
- public void updateRecord(RecordInfo info)
- {
- }
-
- public void deleteRecord(long id)
- {
- }
-
- public void addRecord(RecordInfo info)
- {
- }
-
- public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
- {
- }
- };
-
- bindingsJournal.load(dummyLoader);
- messageJournal.load(dummyLoader);
+ JournalLoadInformation[] info = new JournalLoadInformation[2];
+ info[0] = bindingsJournal.loadInternalOnly();
+ info[1] = messageJournal.loadInternalOnly();
+
+ return info;
}
// Public
-----------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -20,16 +20,17 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.logging.Logger;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -50,8 +51,6 @@
*/
public class NullStorageManager implements StorageManager
{
- private static final Logger log = Logger.getLogger(NullStorageManager.class);
-
private final AtomicLong idSequence = new AtomicLong(0);
private UUID id;
@@ -80,9 +79,9 @@
{
}
- public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception
+ public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo>
queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
-
+ return new JournalLoadInformation();
}
public void prepare(final long txID, final Xid xid) throws Exception
@@ -252,12 +251,13 @@
{
}
- public void loadMessageJournal(PostOffice postOffice,
+ public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[],
Long>>> duplicateIDMap) throws Exception
{
+ return new JournalLoadInformation();
}
public void deleteDuplicateIDTransactional(final long txID, final long recordID)
throws Exception
@@ -271,8 +271,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
*/
- public void loadInternalOnly() throws Exception
+ public JournalLoadInformation[] loadInternalOnly() throws Exception
{
+ return null;
}
/* (non-Javadoc)
@@ -334,4 +335,12 @@
{
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#setReplicator(org.hornetq.core.replication.ReplicationManager)
+ */
+ public void setReplicator(ReplicationManager replicator)
+ {
+ throw new IllegalStateException("Null Persistence should never be used as
replicated");
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-03 15:02:43
UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-03 16:55:02
UTC (rev 8195)
@@ -28,6 +28,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -91,6 +92,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -457,6 +459,11 @@
packet = new ReplicationLargeMessageWriteMessage();
break;
}
+ case REPLICATION_COMPARE_DATA:
+ {
+ packet = new ReplicationCompareDataMessage();
+ break;
+ }
case SESS_FORCE_CONSUMER_DELIVERY:
{
packet = new SessionForceConsumerDelivery();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-03
15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -170,6 +170,8 @@
public static final byte REPLICATION_LARGE_MESSAGE_END = 90;
public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
+
+ public static final byte REPLICATION_COMPARE_DATA = 92;
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * Message used to compare if the Journals between the live and
+ * backup nodes are equivalent and can be used over replication.
+ * The backup journal needs to be an exact copy of the live node before it starts.
+ * @author <a href="mailto:tim.fox@jboss.com">Clebert Suconic</a>
+ */
+public class ReplicationCompareDataMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private JournalLoadInformation[] journalInformation;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationCompareDataMessage(final JournalLoadInformation[]
journalInformation)
+ {
+ super(REPLICATION_COMPARE_DATA);
+
+ this.journalInformation = journalInformation;
+ }
+
+ public ReplicationCompareDataMessage()
+ {
+ super(REPLICATION_COMPARE_DATA);
+ }
+
+ // Public --------------------------------------------------------
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_INT + (journalInformation.length *
(DataConstants.SIZE_INT + DataConstants.SIZE_LONG)) +
+ DataConstants.SIZE_INT;
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(journalInformation.length);
+ for (JournalLoadInformation info : journalInformation)
+ {
+ buffer.writeInt(info.getNumberOfRecords());
+ buffer.writeLong(info.getMaxID());
+ }
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ int numberOfJournals = buffer.readInt();
+
+ this.journalInformation = new JournalLoadInformation[numberOfJournals];
+
+ for (int i = 0; i < numberOfJournals; i++)
+ {
+ this.journalInformation[i] = new JournalLoadInformation();
+ this.journalInformation[i].setNumberOfRecords(buffer.readInt());
+ this.journalInformation[i].setMaxID(buffer.readLong());
+ }
+ }
+
+ /**
+ * @return the journalInformation
+ */
+ public JournalLoadInformation[] getJournalInformation()
+ {
+ return journalInformation;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-11-03
15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.server.HornetQComponent;
@@ -30,5 +32,7 @@
void setChannel(Channel channel);
Channel getChannel();
+
+ void compareJournalInformation(JournalLoadInformation[] journalInformation) throws
HornetQException;
}
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-03
15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -15,7 +15,9 @@
import java.util.Set;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.utils.SimpleString;
@@ -80,4 +82,10 @@
void largeMessageDelete(long messageId);
+ /**
+ * @param journalInfo
+ * @throws HornetQException
+ */
+ void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-03
15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -17,6 +17,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
@@ -44,7 +45,12 @@
// Attributes ----------------------------------------------------
- private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = false;
+
+ private static void trace(String message)
+ {
+ System.out.println("ReplicatedJournal::" + message);
+ }
private final ReplicationManager replicationManager;
@@ -64,10 +70,6 @@
// Static --------------------------------------------------------
- private static void trace(String message)
- {
- log.trace(message);
- }
// Constructors --------------------------------------------------
@@ -335,7 +337,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List,
org.hornetq.core.journal.TransactionFailureCallback)
*/
- public long load(final List<RecordInfo> committedRecords,
+ public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws
Exception
{
@@ -348,7 +350,7 @@
* @throws Exception
* @see
org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
*/
- public long load(final LoaderCallback reloadManager) throws Exception
+ public JournalLoadInformation load(final LoaderCallback reloadManager) throws
Exception
{
return localJournal.load(reloadManager);
}
@@ -397,6 +399,22 @@
return localJournal.isStarted();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+ */
+ public JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ return localJournal.loadInternalOnly();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getNumberOfRecords()
+ */
+ public int getNumberOfRecords()
+ {
+ return localJournal.getNumberOfRecords();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -14,7 +14,8 @@
package org.hornetq.core.replication.impl;
import java.util.ArrayList;
-import java.util.concurrent.Executor;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.replication.ReplicationContext;
@@ -27,75 +28,78 @@
*/
public class ReplicationContextImpl implements ReplicationContext
{
- final Executor executor;
+ private List<Runnable> tasks;
- private ArrayList<Runnable> tasks;
+ private AtomicInteger pendings = new AtomicInteger(0);
- private volatile int pendings;
+ private volatile boolean complete = false;
/**
* @param executor
*/
- public ReplicationContextImpl(Executor executor)
+ public ReplicationContextImpl()
{
super();
- this.executor = executor;
}
/** To be called by the replication manager, when new replication is added to the
queue */
- public synchronized void linedUp()
+ public void linedUp()
{
- pendings++;
+ pendings.incrementAndGet();
}
+ /** You may have several actions to be done after a replication operation is
completed. */
+ public void addReplicationAction(Runnable runnable)
+ {
+ if (complete)
+ {
+ // Sanity check, this shouldn't happen
+ throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
+ }
+
+ if (tasks == null)
+ {
+ // No need to use Concurrent, we only add from a single thread.
+ // We don't add any more Runnables after it is complete
+ tasks = new ArrayList<Runnable>();
+ }
+
+ tasks.add(runnable);
+ }
+
/** To be called by the replication manager, when data is confirmed on the channel */
public synchronized void replicated()
{
- if (--pendings == 0)
+ if (pendings.decrementAndGet() == 0 && complete)
{
flush();
}
}
- /**
- *
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
*/
- public void flush()
+ public synchronized void complete()
{
+ complete = true;
+ if (pendings.get() == 0 && complete)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
if (tasks != null)
{
for (Runnable run : tasks)
{
- executor.execute(run);
+ run.run();
}
tasks.clear();
}
}
- /** You may have several actions to be done after a replication operation is
completed. */
- public synchronized void addReplicationAction(Runnable runnable)
- {
- if (pendings == 0)
- {
- executor.execute(runnable);
- }
- else
- {
- if (tasks == null)
- {
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public void complete()
- {
- // TODO Auto-generated method stub
-
- }
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -16,12 +16,15 @@
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -31,10 +34,13 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -84,6 +90,8 @@
private PagingManager pageManager;
+ private JournalLoadInformation[] journalLoadInformation;
+
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new
ConcurrentHashMap<Long, LargeServerMessage>();
@@ -101,6 +109,8 @@
*/
public void handlePacket(final Packet packet)
{
+ PacketImpl response = new ReplicationResponseMessage();
+
try
{
if (packet.getType() == PacketImpl.REPLICATION_APPEND)
@@ -147,6 +157,11 @@
{
handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
}
+ else if (packet.getType() == REPLICATION_COMPARE_DATA)
+ {
+ handleCompareDataMessage((ReplicationCompareDataMessage)packet);
+ response = new NullResponseMessage();
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
@@ -154,10 +169,10 @@
}
catch (Exception e)
{
- // TODO: what to do when the IO fails on the backup side? should we shutdown the
backup?
log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage((HornetQException)e);
}
- channel.send(new ReplicationResponseMessage());
+ channel.send(response);
}
/* (non-Javadoc)
@@ -182,7 +197,7 @@
messagingJournal = storage.getMessageJournal();
// We only need to load internal structures on the backup...
- storage.loadInternalOnly();
+ journalLoadInformation = storage.loadInternalOnly();
pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
server.getExecutorFactory()),
@@ -199,7 +214,12 @@
*/
public void stop() throws Exception
{
- channel.close();
+ // This could be null if the backup server is being
+ // shut down without any live server connecting here
+ if (channel != null)
+ {
+ channel.close();
+ }
storage.stop();
for (ConcurrentMap<Integer, Page> map : pageIndex.values())
@@ -243,6 +263,52 @@
this.channel = channel;
}
+ public void compareJournalInformation(JournalLoadInformation[] journalInformation)
throws HornetQException
+ {
+ if (this.journalLoadInformation == null || this.journalLoadInformation.length !=
journalInformation.length)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "Live Node contains more journals than the
backup node. Probably a version match error");
+ }
+
+ for (int i = 0; i < journalInformation.length; i++)
+ {
+ if (!journalInformation[i].equals(this.journalLoadInformation[i]))
+ {
+ log.warn("Journal comparisson mismatch:\n" +
journalParametersToString(journalInformation));
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Backup node can't connect to the live
node as the data differs");
+ }
+ }
+
+ }
+
+ /**
+ * @param journalInformation
+ */
+ private String journalParametersToString(JournalLoadInformation[] journalInformation)
+ {
+ return "**********************************************************\n" +
+ "parameters:\n" +
+ "Bindings = " +
+ journalInformation[0] +
+ "\n" +
+ "Messaging = " +
+ journalInformation[1] +
+ "\n" +
+ "**********************************************************" +
+ "\n" +
+ "Expected:" +
+ "\n" +
+ "Bindings = " +
+ this.journalLoadInformation[0] +
+ "\n" +
+ "Messaging = " +
+ this.journalLoadInformation[1] +
+ "\n" +
+ "**********************************************************";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -279,6 +345,15 @@
message.addBytes(packet.getBody());
}
}
+
+ /**
+ * @param request
+ */
+ private void handleCompareDataMessage(ReplicationCompareDataMessage request) throws
HornetQException
+ {
+ compareJournalInformation(request.getJournalInformation());
+ }
+
private LargeServerMessage lookupLargeMessage(long messageId, boolean delete)
{
@@ -301,7 +376,6 @@
return message;
}
-
/**
* @param packet
*/
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -22,6 +22,7 @@
import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.remoting.Channel;
@@ -33,6 +34,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -86,7 +88,7 @@
private final Queue<ReplicationContext> pendingTokens = new
ConcurrentLinkedQueue<ReplicationContext>();
private final ConcurrentHashSet<ReplicationContext> activeContexts = new
ConcurrentHashSet<ReplicationContext>();
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -304,8 +306,19 @@
*/
public synchronized void start() throws Exception
{
+ if (started)
+ {
+ throw new IllegalStateException("ReplicationManager is already
started");
+ }
connection = failoverManager.getConnection();
+ if (connection == null)
+ {
+ log.warn("Backup server MUST be started before live server. Initialisation
will not proceed.");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Backup server MUST be started before live
server. Initialisation will not proceed.");
+ }
+
long channelID = connection.generateChannelID();
Channel mainChannel = connection.getChannel(1, -1);
@@ -381,7 +394,7 @@
ReplicationContext token = tlReplicationContext.get();
if (token == null)
{
- token = new ReplicationContextImpl(executor);
+ token = new ReplicationContextImpl();
activeContexts.add(token);
tlReplicationContext.set(token);
}
@@ -414,6 +427,7 @@
activeContexts.remove(token);
}
});
+ token.complete();
}
}
@@ -455,7 +469,16 @@
repliToken.replicated();
}
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
+ */
+ public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
+ {
+ replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
+ }
+
private void replicated()
{
ReplicationContext tokenPolled = pendingTokens.poll();
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-11-03 15:02:43 UTC (rev
8194)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-11-03 16:55:02 UTC (rev
8195)
@@ -20,6 +20,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -73,7 +74,9 @@
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String
name, int lastReceivedCommandID) throws Exception;
- ReplicationEndpoint createReplicationEndpoint(Channel channel) throws Exception;
+ /** The journal at the backup server has to be equivalent as the journal used on the
live node.
+ * Or else the backup node is out of sync. */
+ ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-03
15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -23,7 +23,6 @@
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;
@@ -145,40 +144,41 @@
tx.commit();
-
- Runnable action = new Runnable()
+ if (storageManager.isReplicated())
{
- public void run()
+ storageManager.afterReplicated(new Runnable()
{
-
- count++;
-
- if (count == batchSize)
+ public void run()
{
- // We continue the next batch on a different thread, so as not to keep the
delivery thread busy for a very
- // long time in the case there are many messages in the queue
- active = false;
-
-
- executor.execute(new Prompter());
-
- count = 0;
+ execPrompter();
}
-
- }
- };
-
- if (storageManager.isReplicated())
- {
- storageManager.afterReplicated(action);
+ });
storageManager.completeReplication();
}
else
{
- action.run();
+ execPrompter();
}
}
+
+ private void execPrompter()
+ {
+ count++;
+
+ if (count == batchSize)
+ {
+ // We continue the next batch on a different thread, so as not to keep the
delivery thread busy for a very
+ // long time in the case there are many messages in the queue
+ active = false;
+
+ executor.execute(new Prompter());
+
+ count = 0;
+ }
+
+ }
+
private class Prompter implements Runnable
{
public void run()
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-11-03
15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -15,8 +15,8 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
@@ -194,7 +194,7 @@
{
Channel channel = connection.getChannel(request.getSessionChannelID(),
request.getWindowSize());
- ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
+ ReplicationEndpoint endpoint = server.connectToReplicationEndpoint(channel);
channel.setHandler(endpoint);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-03 15:02:43
UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-03 16:55:02
UTC (rev 8195)
@@ -50,6 +50,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.LogDelegateFactory;
import org.hornetq.core.logging.Logger;
@@ -267,13 +268,14 @@
{
initialiseLogging();
- log.info((configuration.isBackup() ? "backup" : "live") +
" server is starting..");
-
if (started)
{
+ log.info((configuration.isBackup() ? "backup" : "live") +
" is already started, ignoring the call to start..");
return;
}
+ log.info((configuration.isBackup() ? "backup" : "live") +
" server is starting..");
+
if (configuration.isRunSyncSpeedTest())
{
SyncSpeedTest test = new SyncSpeedTest();
@@ -285,6 +287,11 @@
if (configuration.isBackup())
{
+ if (!configuration.isSharedStore())
+ {
+ this.replicationEndpoint = new ReplicationEndpointImpl(this);
+ this.replicationEndpoint.start();
+ }
// We defer actually initialisation until the live node has contacted the
backup
log.info("Backup server initialised");
}
@@ -658,19 +665,20 @@
return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
- public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel
channel) throws Exception
+ public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel
channel) throws Exception
{
if (!configuration.isBackup())
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected
server is not a backup server");
}
- if (replicationEndpoint == null)
+ if (replicationEndpoint.getChannel() != null)
{
- replicationEndpoint = new ReplicationEndpointImpl(this);
- replicationEndpoint.setChannel(channel);
- replicationEndpoint.start();
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup
replication server is already connected to another server");
}
+
+ replicationEndpoint.setChannel(channel);
+
return replicationEndpoint;
}
@@ -891,7 +899,7 @@
{
String backupConnectorName = configuration.getBackupConnectorName();
- if (backupConnectorName != null)
+ if (!configuration.isSharedStore() && backupConnectorName != null)
{
TransportConfiguration backupConnector =
configuration.getConnectorConfigurations().get(backupConnectorName);
@@ -1086,9 +1094,13 @@
}
}
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
+
// Load the journal and populate queues, transactions and caches in memory
- loadJournal();
+ JournalLoadInformation[] journalInfo = loadJournals();
+
+ compareJournals(journalInfo);
+
// Deploy any queues in the Configuration class - if there's no file deployment
we still need
// to load those
deployQueuesFromConfiguration();
@@ -1149,6 +1161,17 @@
initialised = true;
}
+ /**
+ * @param journalInfo
+ */
+ private void compareJournals(JournalLoadInformation[] journalInfo) throws Exception
+ {
+ if (replicationManager != null)
+ {
+ replicationManager.compareJournals(journalInfo);
+ }
+ }
+
private void deployQueuesFromConfiguration() throws Exception
{
for (QueueConfiguration config : configuration.getQueueConfigurations())
@@ -1160,13 +1183,15 @@
}
}
- private void loadJournal() throws Exception
+ private JournalLoadInformation[] loadJournals() throws Exception
{
+ JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
+
List<QueueBindingInfo> queueBindingInfos = new
ArrayList<QueueBindingInfo>();
List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
- storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+ journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos,
groupingInfos);
// Set the node id - must be before we load the queues into the postoffice, but
after we load the journal
setNodeID();
@@ -1206,7 +1231,7 @@
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
- storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager,
queues, duplicateIDMap);
+ journalInfo[1] = storageManager.loadMessageJournal(postOffice, pagingManager,
resourceManager, queues, duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry :
duplicateIDMap.entrySet())
{
@@ -1219,6 +1244,8 @@
cache.load(entry.getValue());
}
}
+
+ return journalInfo;
}
private void setNodeID() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -853,6 +853,7 @@
Configuration backupConf = new ConfigurationImpl();
backupConf.setSecurityEnabled(false);
backupConf.setClustered(true);
+ backupConf.setSharedStore(true);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
.add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
@@ -873,6 +874,7 @@
connectors.put(liveTC.getName(), liveTC);
liveConf.setConnectorConfigurations(connectors);
liveConf.setBackupConnectorName(backupTC.getName());
+ liveConf.setSharedStore(true);
liveConf.setClustered(true);
List<Pair<String, String>> connectorNames = new
ArrayList<Pair<String, String>>();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -29,7 +29,6 @@
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
@@ -37,6 +36,7 @@
import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -45,11 +45,10 @@
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
-import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
@@ -68,7 +67,7 @@
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
- private static final int[] PORTS = {TransportConstants.DEFAULT_PORT,
+ private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
TransportConstants.DEFAULT_PORT + 3,
@@ -77,8 +76,7 @@
TransportConstants.DEFAULT_PORT + 6,
TransportConstants.DEFAULT_PORT + 7,
TransportConstants.DEFAULT_PORT + 8,
- TransportConstants.DEFAULT_PORT + 9,
- };
+ TransportConstants.DEFAULT_PORT + 9, };
private static final long WAIT_TIMEOUT = 10000;
@@ -136,7 +134,7 @@
protected HornetQServer[] servers = new HornetQServer[MAX_SERVERS];
- private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
+ protected ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
protected void waitForMessages(int node, final String address, final int count) throws
Exception
{
@@ -169,11 +167,11 @@
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
- //System.out.println(threadDump(" - fired by
ClusterTestBase::waitForBindings"));
+ // System.out.println(threadDump(" - fired by
ClusterTestBase::waitForBindings"));
throw new IllegalStateException("Timed out waiting for messages (messageCount
= " + messageCount +
- ", expecting = " +
- count);
+ ", expecting = " +
+ count);
}
protected void waitForServerRestart(int node) throws Exception
@@ -181,7 +179,7 @@
long start = System.currentTimeMillis();
do
{
- if(servers[node].isInitialised())
+ if (servers[node].isInitialised())
{
return;
}
@@ -201,15 +199,15 @@
final int consumerCount,
final boolean local) throws Exception
{
-// log.info("waiting for bindings on node " + node +
-// " address " +
-// address +
-// " count " +
-// count +
-// " consumerCount " +
-// consumerCount +
-// " local " +
-// local);
+ System.out.println("waiting for bindings on node " + node +
+ " address " +
+ address +
+ " count " +
+ count +
+ " consumerCount " +
+ consumerCount +
+ " local " +
+ local);
HornetQServer server = this.servers[node];
if (server == null)
@@ -237,7 +235,7 @@
{
if ((binding instanceof LocalQueueBinding && local) || (binding
instanceof RemoteQueueBinding && !local))
{
- QueueBinding qBinding = (QueueBinding) binding;
+ QueueBinding qBinding = (QueueBinding)binding;
bindingCount++;
@@ -245,7 +243,7 @@
}
}
- //log.info(node + " binding count " + bindingCount + " consumer
Count " + totConsumers);
+ // log.info(node + " binding count " + bindingCount + " consumer
Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
@@ -260,8 +258,8 @@
// System.out.println(threadDump(" - fired by
ClusterTestBase::waitForBindings"));
String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount +
- ", totConsumers = " +
- totConsumers;
+ ", totConsumers = " +
+ totConsumers;
log.error(msg);
@@ -438,12 +436,23 @@
session.close();
}
- protected void sendWithProperty(int node, String address, int numMessages, boolean
durable, SimpleString key, SimpleString val) throws Exception
+ protected void sendWithProperty(int node,
+ String address,
+ int numMessages,
+ boolean durable,
+ SimpleString key,
+ SimpleString val) throws Exception
{
sendInRange(node, address, 0, numMessages, durable, key, val);
}
- protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean
durable, SimpleString key, SimpleString val) throws Exception
+ protected void sendInRange(int node,
+ String address,
+ int msgStart,
+ int msgEnd,
+ boolean durable,
+ SimpleString key,
+ SimpleString val) throws Exception
{
ClientSessionFactory sf = this.sfs[node];
@@ -475,8 +484,11 @@
protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int
timeout)
{
- this.servers[node].getConfiguration().setGroupingHandlerConfiguration(
- new GroupingHandlerConfiguration(new
SimpleString("grouparbitrator"), type, new SimpleString("queues"),
timeout));
+ this.servers[node].getConfiguration()
+ .setGroupingHandlerConfiguration(new
GroupingHandlerConfiguration(new SimpleString("grouparbitrator"),
+
type,
+
new SimpleString("queues"),
+
timeout));
}
protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
@@ -499,17 +511,12 @@
verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
}
- protected void verifyReceiveAllWithGroupIDRoundRobin(
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ protected void verifyReceiveAllWithGroupIDRoundRobin(int msgStart, int msgEnd, int...
consumerIDs) throws Exception
{
verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
}
- protected int verifyReceiveAllOnSingleConsumer(int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ protected int verifyReceiveAllOnSingleConsumer(int msgStart, int msgEnd, int...
consumerIDs) throws Exception
{
return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
}
@@ -553,7 +560,7 @@
assertTrue("Message received too soon",
System.currentTimeMillis() >= firstReceiveTime);
}
- SimpleString id = (SimpleString)
message.getProperty(MessageImpl.HDR_GROUP_ID);
+ SimpleString id =
(SimpleString)message.getProperty(MessageImpl.HDR_GROUP_ID);
System.out.println("received " + id + " on consumer " +
consumerIDs[i]);
if (groupIdsReceived.get(id) == null)
{
@@ -561,20 +568,20 @@
}
else if (groupIdsReceived.get(id) != i)
{
- fail("consumer " + groupIdsReceived.get(id) + " already
bound to groupid " + id + " received on consumer " + i);
+ fail("consumer " + groupIdsReceived.get(id) +
+ " already bound to groupid " +
+ id +
+ " received on consumer " +
+ i);
}
}
}
-
}
- protected int verifyReceiveAllOnSingleConsumer(boolean ack,
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ protected int verifyReceiveAllOnSingleConsumer(boolean ack, int msgStart, int msgEnd,
int... consumerIDs) throws Exception
{
int groupIdsReceived = -1;
for (int i = 0; i < consumerIDs.length; i++)
@@ -649,7 +656,7 @@
assertTrue("Message received too soon",
System.currentTimeMillis() >= firstReceiveTime);
}
- if (j != (Integer) (message.getProperty(COUNT_PROP)))
+ if (j != (Integer)(message.getProperty(COUNT_PROP)))
{
outOfOrder = true;
System.out.println("Message j=" + j + " was received out of
order = " + message.getProperty(COUNT_PROP));
@@ -707,8 +714,8 @@
if (message != null)
{
log.info("check receive Consumer " + consumerIDs[i] +
- " received message " +
- message.getProperty(COUNT_PROP));
+ " received message " +
+ message.getProperty(COUNT_PROP));
}
else
{
@@ -779,7 +786,7 @@
if (message != null)
{
- int count = (Integer) message.getProperty(COUNT_PROP);
+ int count = (Integer)message.getProperty(COUNT_PROP);
Integer prevCount = countMap.get(i);
@@ -799,7 +806,7 @@
message.acknowledge();
}
- //log.info("consumer " + consumerIDs[i] +" returns " +
count);
+ // log.info("consumer " + consumerIDs[i] +" returns "
+ count);
}
else
{
@@ -841,7 +848,7 @@
if (message != null)
{
- int count = (Integer) message.getProperty(COUNT_PROP);
+ int count = (Integer)message.getProperty(COUNT_PROP);
// log.info("consumer " + consumerIDs[i] + " received
message " + count);
@@ -889,7 +896,7 @@
assertNotNull(list);
- int elem = (Integer) list.poll();
+ int elem = (Integer)list.poll();
assertEquals(messageCounts[i], elem);
@@ -929,7 +936,7 @@
if (message != null)
{
- int count = (Integer) message.getProperty(COUNT_PROP);
+ int count = (Integer)message.getProperty(COUNT_PROP);
ints.add(count);
}
@@ -1011,18 +1018,21 @@
serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
}
- Map<String, Object> backupParams = generateParams(backupNode, netty);
+ TransportConfiguration serverBackuptc = null;
- TransportConfiguration serverBackuptc;
+ if (backupNode != -1)
+ {
+ Map<String, Object> backupParams = generateParams(backupNode, netty);
- if (netty)
- {
- serverBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY,
backupParams);
+ if (netty)
+ {
+ serverBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY,
backupParams);
+ }
+ else
+ {
+ serverBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
backupParams);
+ }
}
- else
- {
- serverBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
backupParams);
- }
ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc,
serverBackuptc);
@@ -1068,6 +1078,16 @@
protected void setupServer(int node, boolean fileStorage, boolean netty, boolean
backup, int backupNode)
{
+ setupServer(node, fileStorage, true, netty, backup, backupNode);
+ }
+
+ protected void setupServer(int node,
+ boolean fileStorage,
+ boolean sharedStorage,
+ boolean netty,
+ boolean backup,
+ int backupNode)
+ {
if (servers[node] != null)
{
throw new IllegalArgumentException("Already a server at node " +
node);
@@ -1076,15 +1096,28 @@
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, backup));
configuration.setJournalMinFiles(2);
configuration.setJournalMaxAIO(1000);
- configuration.setJournalDirectory(getJournalDir(node, backup));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
configuration.setJournalMaxAIO(1000);
- configuration.setPagingDirectory(getPageDir(node, backup));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ int nodeDirectoryToUse = backupNode == -1 ? node : backupNode;
+ configuration.setBindingsDirectory(getBindingsDir(nodeDirectoryToUse, false));
+ configuration.setJournalDirectory(getJournalDir(nodeDirectoryToUse, false));
+ configuration.setPagingDirectory(getPageDir(nodeDirectoryToUse, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(nodeDirectoryToUse,
false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, backup));
+ configuration.setJournalDirectory(getJournalDir(node, backup));
+ configuration.setPagingDirectory(getPageDir(node, backup));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
+ }
configuration.setClustered(true);
configuration.setJournalCompactMinFiles(0);
configuration.setBackup(backup);
@@ -1141,7 +1174,6 @@
servers[node] = server;
}
-
protected void setupServerWithDiscovery(int node,
String groupAddress,
int port,
@@ -1238,21 +1270,21 @@
configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
- nettyBackuptc == null ? null : nettyBackuptc.getName()));
+ nettyBackuptc == null ? null :
nettyBackuptc.getName()));
}
else
{
connectorPairs.add(new Pair<String, String>(invmtc_c.getName(),
invmBackuptc == null ? null
- : invmBackuptc.getName()));
+
: invmBackuptc.getName()));
}
BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 250,
- connectorPairs);
+ null,
+ -1,
+
groupAddress,
+ port,
+ 250,
+
connectorPairs);
configuration.getBroadcastGroupConfigurations().add(bcConfig);
@@ -1280,7 +1312,7 @@
if (netty)
{
params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT +
node);
+
org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
}
else
{
@@ -1351,12 +1383,12 @@
pairs.add(connectorPair);
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
- address,
- 100,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+
address,
+
100,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1402,12 +1434,12 @@
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
- address,
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+
address,
+
250,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1466,18 +1498,16 @@
Pair<String, String> connectorPair = new Pair<String,
String>(serverTotc.getName(), serverBackupTotc.getName());
- // Pair<String, String> connectorPair = new Pair<String,
String>(serverTotc.getName(), null);
-
pairs.add(connectorPair);
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
- address,
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+
address,
+
250,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1498,12 +1528,12 @@
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
- address,
- 100,
- true,
- forwardWhenNoConsumers,
- maxHops,
- discoveryGroupName);
+
address,
+
100,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
discoveryGroupName);
List<ClusterConnectionConfiguration> clusterConfs =
server.getConfiguration().getClusterConfigurations();
clusterConfs.add(clusterConf);
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -85,15 +85,15 @@
{
super(name);
}
-
+
public FailoverTest()
{
}
-
+
abstract class BaseListener implements SessionFailureListener
{
public void beforeReconnect(HornetQException me)
- {
+ {
}
}
@@ -115,7 +115,7 @@
public void connectionFailed(HornetQException me)
{
latch.countDown();
- }
+ }
}
session.addFailureListener(new MyListener());
@@ -136,7 +136,7 @@
}
fail(session, latch);
-
+
log.info("got here 1");
ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -169,6 +169,77 @@
assertEquals(0, sf.numConnections());
}
+
+ /** It doesn't fail, but it restart both servers, live and backup, and the data
should be received after the restart,
+ * and the servers should be able to connect without any problems. */
+ public void testRestartServers() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+
+ ClientSession session = sf.createSession(true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ setBody(i, message);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ server0Service.stop();
+ server1Service.stop();
+
+ server1Service.start();
+ server0Service.start();
+
+ sf = getSessionFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+
+ session = sf.createSession(true, true);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ assertEquals(i, message.getProperty("counter"));
+
+ message.acknowledge();
+ }
+
+ log.info("closing session");
+ session.close();
+
+ assertEquals(0, sf.numSessions());
+
+ assertEquals(0, sf.numConnections());
+ }
+
/**
* @param session
* @param latch
@@ -176,7 +247,7 @@
*/
private void fail(ClientSession session, final CountDownLatch latch) throws
InterruptedException
{
-
+
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
// Simulate failure on connection
@@ -202,14 +273,13 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
latch.countDown();
}
-
-
+
}
session.addFailureListener(new MyListener());
@@ -228,7 +298,7 @@
producer.send(message);
}
-
+
fail(session, latch);
assertTrue(session.isRollbackOnly());
@@ -272,7 +342,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -330,7 +400,7 @@
}
assertNull(consumer.receive(1000));
-
+
session.commit();
session.close();
@@ -353,7 +423,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -391,15 +461,15 @@
fail(session, latch);
session.commit();
-
+
session.close();
-
+
session = sf.createSession(false, false);
-
+
consumer = session.createConsumer(ADDRESS);
session.start();
-
+
for (int i = 0; i < numMessages; i++)
{
// Only the persistent messages will survive
@@ -419,7 +489,7 @@
}
assertNull(consumer.receive(1000));
-
+
session.commit();
session.close();
@@ -428,7 +498,7 @@
assertEquals(0, sf.numConnections());
}
-
+
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();
@@ -442,7 +512,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -491,7 +561,7 @@
fail(session2, latch);
assertTrue(session2.isRollbackOnly());
-
+
try
{
session2.commit();
@@ -525,7 +595,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -578,7 +648,7 @@
fail(session2, latch);
assertFalse(session2.isRollbackOnly());
-
+
consumer = session2.createConsumer(ADDRESS);
for (int i = numMessages / 2; i < numMessages; i++)
@@ -597,7 +667,7 @@
session2.commit();
assertNull(consumer.receive(1000));
-
+
session1.close();
session2.close();
@@ -622,7 +692,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -692,7 +762,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -765,7 +835,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -839,7 +909,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -926,7 +996,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1011,7 +1081,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1072,7 +1142,7 @@
// Wait to be informed of failure
boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
+
log.info("waited for latch");
assertTrue(ok);
@@ -1087,9 +1157,9 @@
{
assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- //Thread.sleep(30000);
+ // Thread.sleep(30000);
+
session1.close();
session2.close();
@@ -1113,7 +1183,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1197,7 +1267,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1244,7 +1314,7 @@
Map<ClientSession, List<ClientConsumer>> sessionConsumerMap = new
HashMap<ClientSession, List<ClientConsumer>>();
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
CountDownLatch latch = new CountDownLatch(1);
@@ -1342,7 +1412,6 @@
assertEquals(0, sf.numConnections());
}
-
/*
* Browser will get reset to beginning after failover
*/
@@ -1359,7 +1428,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1439,7 +1508,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1522,7 +1591,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1619,7 +1688,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1687,7 +1756,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -1708,11 +1777,11 @@
ClientMessage message = session.createClientMessage(true);
if (i == 0)
- {
+ {
// Only need to add it on one message per tx
message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, new
SimpleString(txID));
}
-
+
setBody(i, message);
message.putIntProperty("counter", i);
@@ -1787,7 +1856,7 @@
ClientMessage message = session2.createClientMessage(true);
if (i == 0)
- {
+ {
// Only need to add it on one message per tx
message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, new
SimpleString(txID));
}
@@ -1843,7 +1912,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
+ class MyListener extends BaseListener
{
public void connectionFailed(HornetQException me)
{
@@ -2007,7 +2076,6 @@
}
}
-
/**
* @param i
* @param message
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -80,6 +80,6 @@
void setupMasterServer(int i, boolean fileStorage, boolean netty)
{
- setupServer(i, fileStorage, netty, 2);
+ setupServer(i, fileStorage, false, netty, false, 2);
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -12,28 +12,22 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.message.impl.MessageImpl;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
import org.hornetq.utils.SimpleString;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -135,7 +135,7 @@
for (int i = 0; i < MIDDLE; i++)
{
- ClientMessage msg = cons.receive(10000);
+ ClientMessage msg = cons.receive(20000);
assertNotNull(msg);
msg.acknowledge();
if (transacted && i % 10 == 0)
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A SymmetricFailoverTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedDistrubtionTest extends ClusterTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testRedistribution() throws Exception
+ {
+ setupSessionFactory(1, 0, true, true);
+ setupSessionFactory(3, 2, true, true);
+
+ ClientSession sessionOne = sfs[1].createSession(true, true);
+
+ ClientSession sessionThree = sfs[3].createSession(false, false);
+
+ sessionOne.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionThree.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
+
+ sessionThree.start();
+
+ waitForBindings(3, "test.SomeAddress", 1, 1, true);
+
+ try
+ {
+ ClientProducer producer = sessionOne.createProducer(ADDRESS);
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = sessionOne.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ msg.putIntProperty(new SimpleString("key"), i);
+ producer.send(msg);
+ }
+
+ sessionOne.commit();
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage msg = consThree.receive(15000);
+
+ assertNotNull(msg);
+
+ System.out.println(i + " msg = " + msg);
+
+ int received = (Integer)msg.getProperty(new SimpleString("key"));
+
+ if (i != received)
+ {
+ // Shouldn't this be a failure?
+ System.out.println(i + "!=" + received);
+ }
+ msg.acknowledge();
+ }
+
+ sessionThree.commit();
+
+ // consThree.close();
+
+ // TODO: Remove this sleep: If a node fail,
+ // Redistribution may loose messages between the nodes.
+ Thread.sleep(500);
+
+ fail(sessionThree);
+
+ // sessionThree.close();
+ //
+ // setupSessionFactory(2, -1, true);
+ //
+ // sessionThree = sfs[2].createSession(true, true);
+ //
+ // sessionThree.start();
+
+ // consThree = sessionThree.createConsumer(ADDRESS);
+
+ for (int i = 50; i < 100; i++)
+ {
+ ClientMessage msg = consThree.receive(15000);
+
+ assertNotNull(msg);
+
+ System.out.println(i + " msg = " + msg);
+
+ int received = (Integer)msg.getProperty(new SimpleString("key"));
+
+ if (i != received)
+ {
+ // Shouldn't this be a failure?
+ System.out.println(i + "!=" + received);
+ }
+ msg.acknowledge();
+ }
+
+ assertNull(consThree.receiveImmediate());
+
+ sessionThree.commit();
+
+ sessionOne.start();
+
+ ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
+
+ assertNull(consOne.receiveImmediate());
+
+ }
+ finally
+ {
+ sessionOne.close();
+ sessionThree.close();
+ }
+ }
+
+ public void testSimpleRedistributionOverReplication() throws Exception
+ {
+ setupSessionFactory(1, 0, true, true);
+ setupSessionFactory(3, 2, true, true);
+
+ ClientSession sessionOne = sfs[1].createSession(true, true);
+
+ ClientSession sessionThree = sfs[3].createSession(false, false);
+
+ sessionOne.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionThree.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
+
+ sessionThree.start();
+
+ waitForBindings(3, "test.SomeAddress", 1, 1, true);
+
+ try
+ {
+ ClientProducer producer = sessionOne.createProducer(ADDRESS);
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = sessionOne.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ msg.putIntProperty(new SimpleString("key"), i);
+ producer.send(msg);
+ }
+
+ sessionOne.commit();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = consThree.receive(15000);
+
+ assertNotNull(msg);
+
+ System.out.println(i + " msg = " + msg);
+
+ int received = (Integer)msg.getProperty(new SimpleString("key"));
+
+ if (i != received)
+ {
+ // Shouldn't this be a failure?
+ System.out.println(i + "!=" + received);
+ }
+ msg.acknowledge();
+ }
+
+ sessionThree.commit();
+
+ sessionOne.start();
+
+ ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
+
+ assertNull(consOne.receiveImmediate());
+
+ }
+ finally
+ {
+ sessionOne.close();
+ sessionThree.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ /**
+ * @param session
+ * @param latch
+ * @throws InterruptedException
+ */
+ private void fail(final ClientSession session) throws InterruptedException
+ {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me)
+ {
+ latch.countDown();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.client.SessionFailureListener#beforeReconnect(org.hornetq.core.exception.HornetQException)
+ */
+ public void beforeReconnect(final HornetQException exception)
+ {
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServer(0, true, isShared(), true, true, -1);
+ setupServer(1, true, isShared(), true, false, 0);
+ setupServer(2, true, isShared(), true, true, -1);
+ setupServer(3, true, isShared(), true, true, 2);
+
+ setupClusterConnectionWithBackups("test", "test", false, 1,
true, 1, new int[] { 3 }, new int[] { 2 });
+
+ AddressSettings as = new AddressSettings();
+ as.setRedistributionDelay(0);
+
+ getServer(0).getAddressSettingsRepository().addMatch("test.*", as);
+ getServer(1).getAddressSettingsRepository().addMatch("test.*", as);
+ getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
+ getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
+
+ servers[0].start();
+ servers[2].start();
+ servers[1].start();
+ servers[3].start();
+ }
+
+ protected boolean isShared()
+ {
+ return false;
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ servers[2].stop();
+ servers[0].stop();
+ servers[1].stop();
+ servers[3].stop();
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A SharedStoreReplicatedDistributionTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SharedStoreDistributionTest extends ReplicatedDistrubtionTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected boolean isShared()
+ {
+ return true;
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -863,6 +863,7 @@
.add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
backupParams));
backupConf.setBackup(true);
+ backupConf.setSharedStore(true);
backupService = HornetQ.newHornetQServer(backupConf, false);
backupService.start();
@@ -877,6 +878,7 @@
connectors.put(backupTC.getName(), backupTC);
connectors.put(liveTC.getName(), liveTC);
liveConf.setConnectorConfigurations(connectors);
+ liveConf.setSharedStore(true);
liveConf.setBackupConnectorName(backupTC.getName());
liveConf.setClustered(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -37,6 +37,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
@@ -107,7 +108,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
manager.start();
manager.stop();
}
@@ -117,6 +119,87 @@
}
}
+ public void testInvalidJournal() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ FailoverManager failoverManager = createFailoverManager();
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
+ manager.start();
+ try
+ {
+ manager.compareJournals(new JournalLoadInformation[]{new
JournalLoadInformation(2,2), new JournalLoadInformation(2,2)});
+ fail("Exception was expected");
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
+ }
+
+ manager.compareJournals(new JournalLoadInformation[]{new
JournalLoadInformation(), new JournalLoadInformation()});
+
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ // should throw an exception if a second server connects to the same backup
+ public void testInvalidConnection() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ FailoverManager failoverManager = createFailoverManager();
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
+
+ manager.start();
+
+ try
+ {
+ ReplicationManagerImpl manager2 = new
ReplicationManagerImpl(failoverManager,
+ executor);
+
+ manager2.start();
+ fail("Exception was expected");
+ }
+ catch (Exception e)
+ {
+ }
+
+ manager.stop();
+
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
public void testConnectIntoNonBackup() throws Exception
{
@@ -132,7 +215,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
try
{
@@ -166,7 +250,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -184,11 +269,9 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- blockOnReplication(manager);
-
assertEquals(1, manager.getActiveTokens().size());
- manager.closeContext();
+ blockOnReplication(manager);
for (int i = 0; i < 100; i++)
{
@@ -272,11 +355,11 @@
config.setBackup(true);
ArrayList<String> intercepts = new ArrayList<String>();
-
+
intercepts.add(TestInterceptor.class.getName());
-
+
config.setInterceptorClassNames(intercepts);
-
+
HornetQServer server = new HornetQServerImpl(config);
server.start();
@@ -285,7 +368,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -308,7 +392,7 @@
});
manager.closeContext();
-
+
server.stop();
assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -336,9 +420,28 @@
});
+ manager.closeContext();
+
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
+ public void testNoServer() throws Exception
+ {
+ FailoverManager failoverManager = createFailoverManager();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
+ manager.start();
+ fail("Exception expected");
+ }
+ catch (HornetQException expected)
+ {
+ assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
+ }
+ }
+
public void testNoActions() throws Exception
{
@@ -354,7 +457,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ executor);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -371,11 +475,13 @@
}
});
- assertTrue(latch.await(1, TimeUnit.SECONDS));
+
assertEquals(1, manager.getActiveTokens().size());
manager.closeContext();
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
for (int i = 0; i < 100; i++)
{
// This is asynchronous. Have to wait completion
@@ -505,7 +611,6 @@
};
-
static class FakeJournal implements Journal
{
@@ -649,21 +754,21 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
*/
- public long load(LoaderCallback reloadManager) throws Exception
+ public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
{
- return 0;
+ return new JournalLoadInformation();
}
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List,
org.hornetq.core.journal.TransactionFailureCallback)
*/
- public long load(List<RecordInfo> committedRecords,
- List<PreparedTransactionInfo> preparedTransactions,
- TransactionFailureCallback transactionFailure) throws Exception
+ public JournalLoadInformation load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo>
preparedTransactions,
+ TransactionFailureCallback transactionFailure)
throws Exception
{
- return 0;
+ return new JournalLoadInformation();
}
/* (non-Javadoc)
@@ -699,5 +804,21 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+ */
+ public JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ return new JournalLoadInformation();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getNumberOfRecords()
+ */
+ public int getNumberOfRecords()
+ {
+ return 0;
+ }
+
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-03
15:02:43 UTC (rev 8194)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-03
16:55:02 UTC (rev 8195)
@@ -28,6 +28,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -47,6 +48,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -945,8 +947,9 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
- public void loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
+ public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
+ return new JournalLoadInformation();
}
public void addGrouping(GroupBinding groupBinding) throws Exception
@@ -962,12 +965,13 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager,
java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
*/
- public void loadMessageJournal(PostOffice postOffice,
+ public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[],
Long>>> duplicateIDMap) throws Exception
{
+ return new JournalLoadInformation();
}
/* (non-Javadoc)
@@ -1166,10 +1170,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
*/
- public void loadInternalOnly() throws Exception
+ public JournalLoadInformation[] loadInternalOnly() throws Exception
{
-
-
+ return null;
}
/* (non-Javadoc)
@@ -1206,6 +1209,13 @@
{
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#setReplicator(org.hornetq.core.replication.ReplicationManager)
+ */
+ public void setReplicator(ReplicationManager replicator)
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory