JBoss hornetq SVN: r11176 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 06:28:45 -0400 (Wed, 10 Aug 2011)
New Revision: 11176
Added:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
Removed:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Rename ReplicatingJournal (as there was a ReplicatEDJournal already)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-10 01:53:32 UTC (rev 11175)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-10 10:28:45 UTC (rev 11176)
@@ -26,9 +26,9 @@
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.FileWrapperJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.ReplicatingJournal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -441,7 +441,7 @@
/**
* Reserves files (with the given fileID) in the specified journal, and places a
- * {@link ReplicatingJournal} in place to store messages while synchronization is going on.
+ * {@link FileWrapperJournal} in place to store messages while synchronization is going on.
* @param packet
* @throws Exception
*/
@@ -458,8 +458,8 @@
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
current.getFile().open(1, false);
registerJournal(packet.getJournalContentType().typeByte,
- new ReplicatingJournal(current, storage.hasCallbackSupport()));
- }
+ new FileWrapperJournal(current, storage.hasCallbackSupport()));
+ }
// XXX HORNETQ-720 really need to do away with this once the method calls get stable.
private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
Copied: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java (from rev 11169, branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10 10:28:45 UTC (rev 11176)
@@ -0,0 +1,229 @@
+package org.hornetq.core.journal.impl;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
+
+/**
+ * Journal used at a replicating backup server during the synchronization of data with the 'live'
+ * server. It just wraps a single {@link JournalFile}.
+ * <p>
+ * Its main purpose is to store the data as a Journal would, but without verifying records.
+ */
+public class FileWrapperJournal extends JournalBase implements Journal
+{
+ private final ReentrantLock lockAppend = new ReentrantLock();
+ // private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+
+ private final JournalFile currentFile;
+
+ /**
+ * @param file
+ */
+ public FileWrapperJournal(JournalFile file, boolean hasCallbackSupport)
+ {
+ super(hasCallbackSupport);
+ currentFile = file;
+ }
+
+ @Override
+ public void start() throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ currentFile.getFile().close();
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------------------
+
+ // ------------------------
+
+// private void readLockJournal()
+// {
+// journalLock.readLock().lock();
+// }
+//
+// private void readUnlockJournal()
+// {
+// journalLock.readLock().unlock();
+// }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
+ throws Exception
+ {
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+
+ writeRecord(addRecord, sync, callback);
+ }
+
+ /**
+ * Write the record to the current file.
+ */
+ private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
+ {
+
+
+ lockAppend.lock();
+ try
+ {
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
+
+ encoder.setFileID(currentFile.getRecordID());
+
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
+ }
+ else
+ {
+ currentFile.getFile().write(encoder, sync);
+ }
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
+ }
+
+ @Override
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void
+ appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
+ throws Exception
+ {
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ writeRecord(updateRecord, sync, callback);
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void lineUpContex(IOCompletion callback)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public int getAlignment() throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public int getNumberOfRecords()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUserVersion()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void perfBlast(int pages)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void runDirectJournalBlast() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Deleted: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-10 01:53:32 UTC (rev 11175)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-10 10:28:45 UTC (rev 11176)
@@ -1,232 +0,0 @@
-package org.hornetq.core.journal.impl;
-
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.LoaderCallback;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
-
-/**
- * Journal used at a replicating backup server during the synchronization of data with the 'live'
- * server.
- * <p>
- * Its main purpose is to store the data as a Journal would, but without verifying records.
- */
-public class ReplicatingJournal extends JournalBase implements Journal
-{
- private final ReentrantLock lockAppend = new ReentrantLock();
- private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
-
- private final JournalFile currentFile;
-
- /**
- * @param file
- */
- public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
- {
- super(hasCallbackSupport);
- currentFile = file;
- }
-
- @Override
- public void start() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void stop() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public boolean isStarted()
- {
- throw new UnsupportedOperationException();
- }
-
- // ------------------------
-
- // ------------------------
-
- private void readLockJournal()
- {
- journalLock.readLock().lock();
- }
-
- private void readUnlockJournal()
- {
- journalLock.readLock().unlock();
- }
-
- @Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
- throws Exception
- {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
-
- writeRecord(addRecord, sync, callback);
-
- }
-
- /**
- * Write the record to the current file.
- */
- private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
- {
-
-
- lockAppend.lock();
- try
- {
- if (callback != null)
- {
- callback.storeLineUp();
- }
-
- encoder.setFileID(currentFile.getRecordID());
-
- if (callback != null)
- {
- currentFile.getFile().write(encoder, sync, callback);
- }
- else
- {
- currentFile.getFile().write(encoder, sync);
- }
- }
- finally
- {
- lockAppend.unlock();
- }
- }
-
- @Override
- public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void
- appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
- throws Exception
- {
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
- writeRecord(updateRecord, sync, callback);
- }
-
- @Override
- public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public JournalLoadInformation loadInternalOnly() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void lineUpContex(IOCompletion callback)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JournalLoadInformation load(List<RecordInfo> committedRecords,
- List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public int getAlignment() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public int getNumberOfRecords()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getUserVersion()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void perfBlast(int pages)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void runDirectJournalBlast() throws Exception
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JournalLoadInformation loadSyncOnly() throws Exception
- {
- throw new UnsupportedOperationException();
- }
-}
13 years, 5 months
JBoss hornetq SVN: r11175 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 21:53:32 -0400 (Tue, 09 Aug 2011)
New Revision: 11175
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
fixing a deadlock found on the testsuite
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-10 00:47:42 UTC (rev 11174)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-10 01:53:32 UTC (rev 11175)
@@ -16,6 +16,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Interceptor;
@@ -36,6 +37,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -51,9 +53,9 @@
public class CoreProtocolManager implements ProtocolManager
{
private static final Logger log = Logger.getLogger(CoreProtocolManager.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private final HornetQServer server;
private final List<Interceptor> interceptors;
@@ -74,7 +76,7 @@
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
.getExecutor()
: null,
- server.getNodeID());
+ server.getNodeID());
Channel channel1 = rc.getChannel(1, -1);
@@ -113,29 +115,51 @@
else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
{
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
-
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ Executor executor = server.getThreadPool();
+
+ public void nodeUP(final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ // Using an executor as most of the notifications on the Topology
+ // may come from a channel itself
+ // What could cause deadlocks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ }
+ });
}
-
- public void nodeDown(String nodeID)
+
+ public void nodeDown(final String nodeID)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ // Using an executor as most of the notifications on the Topology
+ // may come from a channel itself
+ // What could cause deadlocks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ }
+ });
}
-
+
public String toString()
{
return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
}
};
-
+
final boolean isCC = msg.isClusterConnection();
-
+
server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
+
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
@@ -165,8 +189,6 @@
}
}
});
-
-
return entry;
}
13 years, 5 months
JBoss hornetq SVN: r11174 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 20:47:42 -0400 (Tue, 09 Aug 2011)
New Revision: 11174
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://issues.jboss.org/browse/HORNETQ-722
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-09 20:40:04 UTC (rev 11173)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-10 00:47:42 UTC (rev 11174)
@@ -1837,7 +1837,7 @@
}
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
- if (message.isDurable() && durable)
+ if (!reference.isPaged() && message.isDurable() && durable)
{
storageManager.updateScheduledDeliveryTime(reference);
}
13 years, 5 months
JBoss hornetq SVN: r11173 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 16:40:04 -0400 (Tue, 09 Aug 2011)
New Revision: 11173
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
Log:
just removing comment
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-09 20:08:58 UTC (rev 11172)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-09 20:40:04 UTC (rev 11173)
@@ -399,7 +399,6 @@
{
if (executor != null)
{
- log.debug(this + " is running runnable without an executor");
executor.execute(runnable);
}
else
13 years, 5 months
JBoss hornetq SVN: r11172 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 16:08:58 -0400 (Tue, 09 Aug 2011)
New Revision: 11172
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixing a deadlock found on the testsuite
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-09 19:29:50 UTC (rev 11171)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-09 20:08:58 UTC (rev 11172)
@@ -378,6 +378,8 @@
backup = false;
String nodeID = server.getNodeID().toString();
+
+ topology.setExecutor(executor);
TopologyMember member = topology.getMember(nodeID);
//swap backup as live and send it to everybody
13 years, 5 months
JBoss hornetq SVN: r11171 - in branches/Branch_2_2_EAP: tests/joram-tests/src/org/hornetq/jms and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 15:29:50 -0400 (Tue, 09 Aug 2011)
New Revision: 11171
Added:
branches/Branch_2_2_EAP/tests/joram-tests/src/org/hornetq/jms/JoramAggregationTest.java
Modified:
branches/Branch_2_2_EAP/build-hornetq.xml
branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/conform/message/MessageBodyTest.java
branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/framework/JMSTestCase.java
Log:
Speeding up JORAM tests
Modified: branches/Branch_2_2_EAP/build-hornetq.xml
===================================================================
--- branches/Branch_2_2_EAP/build-hornetq.xml 2011-08-09 17:43:09 UTC (rev 11170)
+++ branches/Branch_2_2_EAP/build-hornetq.xml 2011-08-09 19:29:50 UTC (rev 11171)
@@ -1873,7 +1873,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${test.joram.classes.dir}">
- <include name="**/${test-mask}.class"/>
+ <include name="**/JoramAggregationTest.class"/>
</fileset>
</batchtest>
</junit>
Added: branches/Branch_2_2_EAP/tests/joram-tests/src/org/hornetq/jms/JoramAggregationTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/joram-tests/src/org/hornetq/jms/JoramAggregationTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/joram-tests/src/org/hornetq/jms/JoramAggregationTest.java 2011-08-09 19:29:50 UTC (rev 11171)
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms;
+
+import java.io.IOException;
+import java.util.Hashtable;
+import java.util.Properties;
+
+import junit.extensions.TestSetup;
+import junit.framework.AssertionFailedError;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestListener;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.objectweb.jtests.jms.admin.Admin;
+import org.objectweb.jtests.jms.admin.AdminFactory;
+import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
+import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest;
+import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
+import org.objectweb.jtests.jms.conform.message.MessageDefaultTest;
+import org.objectweb.jtests.jms.conform.message.MessageTypeTest;
+import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest;
+import org.objectweb.jtests.jms.conform.message.properties.JMSXPropertyTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest;
+import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest;
+import org.objectweb.jtests.jms.conform.queue.QueueBrowserTest;
+import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest;
+import org.objectweb.jtests.jms.conform.selector.SelectorTest;
+import org.objectweb.jtests.jms.conform.session.QueueSessionTest;
+import org.objectweb.jtests.jms.conform.session.SessionTest;
+import org.objectweb.jtests.jms.conform.session.TopicSessionTest;
+import org.objectweb.jtests.jms.conform.session.UnifiedSessionTest;
+import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
+import org.objectweb.jtests.jms.framework.JMSTestCase;
+
+/**
+ * JoramAggregationTest.
+ *
+ * @author <a href="adrian(a)jboss.com">Adrian Brock</a>
+ * @version $Revision: 1.2 $
+ */
+public class JoramAggregationTest extends TestCase
+{
+ public JoramAggregationTest(String name)
+ {
+ super(name);
+ }
+
+
+
+ /** Used to similuate tests while renaming its names. */
+ private static class DummyTestCase extends TestCase
+ {
+ DummyTestCase(String name)
+ {
+ super (name);
+ }
+ }
+
+ /**
+ * One of the goals of this class also is to keep original classNames into testNames. So, you will realize several proxies existent here to
+ * keep these class names while executing method names.
+ */
+ static class TestProxy extends TestCase
+ {
+ Hashtable hashTests = new Hashtable();
+
+
+ public TestProxy(Test testcase, String name)
+ {
+ super(name);
+ this.testcase = testcase;
+ }
+
+ public int countTestCases()
+ {
+ return testcase.countTestCases();
+ }
+
+ /**
+ * Create a dummy test renaming its content
+ * @param test
+ * @return
+ */
+ private Test createDummyTest(Test test)
+ {
+ Test dummyTest = (Test)hashTests.get(test);
+ if (dummyTest==null)
+ {
+ if (test instanceof TestCase)
+ {
+ dummyTest = new DummyTestCase(this.getName() + ":"+ ((TestCase)test).getName());
+ } else
+ if (test instanceof TestSuite)
+ {
+ dummyTest = new DummyTestCase(this.getName() + ":"+ ((TestCase)test).getName());
+ }
+ else
+ {
+ dummyTest = new DummyTestCase(test.getClass().getName());
+ }
+
+ hashTests.put(test,dummyTest);
+ }
+
+ return dummyTest;
+ }
+
+ public void run(final TestResult result)
+ {
+ TestResult subResult = new TestResult();
+ subResult.addListener(new TestListener()
+ {
+ public void addError(Test subtest, Throwable throwable)
+ {
+ Test dummyTest = createDummyTest(subtest);
+ result.addError(dummyTest, throwable);
+ }
+
+ public void addFailure(Test subtest, AssertionFailedError assertionFailedError)
+ {
+ Test dummyTest = createDummyTest(subtest);
+ result.addFailure(dummyTest, assertionFailedError);
+ }
+
+ public void endTest(Test subtest)
+ {
+ Test dummyTest = createDummyTest(subtest);
+ result.endTest(dummyTest);
+ }
+
+ public void startTest(Test subtest)
+ {
+ Test dummyTest = createDummyTest(subtest);
+ result.startTest(dummyTest);
+ }
+ });
+ testcase.run(subResult);
+ }
+
+ Test testcase;
+ }
+
+
+
+
+
+ public static junit.framework.Test suite() throws Exception
+ {
+ TestSuite suite = new TestSuite();
+
+ suite.addTest(new TestProxy(TopicConnectionTest.suite(),TopicConnectionTest.class.getName()));
+ suite.addTest(new TestProxy(ConnectionTest.suite(), ConnectionTest.class.getName()));
+ suite.addTest(new TestProxy(MessageBodyTest.suite(), MessageBodyTest.class.getName()));
+ suite.addTest(new TestProxy(MessageDefaultTest.suite(), MessageDefaultTest.class.getName()));
+ suite.addTest(new TestProxy(MessageTypeTest.suite(), MessageTypeTest.class.getName()));
+ suite.addTest(new TestProxy(MessageHeaderTest.suite(), MessageHeaderTest.class.getName()));
+ suite.addTest(new TestProxy(JMSXPropertyTest.suite(), JMSXPropertyTest.class.getName()));
+ suite.addTest(new TestProxy(MessagePropertyConversionTest.suite(), MessagePropertyConversionTest.class.getName()));
+ suite.addTest(new TestProxy(MessagePropertyTest.suite(), MessagePropertyTest.class.getName()));
+ suite.addTest(new TestProxy(QueueBrowserTest.suite(), QueueBrowserTest.class.getName()));
+ suite.addTest(new TestProxy(TemporaryQueueTest.suite(), TemporaryQueueTest.class.getName()));
+ suite.addTest(new TestProxy(SelectorSyntaxTest.suite(), SelectorSyntaxTest.class.getName()));
+ suite.addTest(new TestProxy(SelectorTest.suite(), SelectorTest.class.getName()));
+ suite.addTest(new TestProxy(QueueSessionTest.suite(), QueueSessionTest.class.getName()));
+ suite.addTest(new TestProxy(SessionTest.suite(), SessionTest.class.getName()));
+ suite.addTest(new TestProxy(TopicSessionTest.suite(), TopicSessionTest.class.getName()));
+ suite.addTest(new TestProxy(UnifiedSessionTest.suite(), UnifiedSessionTest.class.getName()));
+ suite.addTest(new TestProxy(TemporaryTopicTest.suite(), TemporaryTopicTest.class.getName()));
+
+ return new TestAggregation(suite);
+ }
+ /**
+ * Should be overriden
+ * @return
+ */
+ protected static Properties getProviderProperties() throws IOException
+ {
+ Properties props = new Properties();
+ props.load(ClassLoader.getSystemResourceAsStream(JMSTestCase.PROP_FILE_NAME));
+ return props;
+ }
+
+
+ static class TestAggregation extends TestSetup
+ {
+
+ Admin admin;
+
+ /**
+ * @param test
+ */
+ public TestAggregation(Test test)
+ {
+ super(test);
+ }
+
+ public void setUp() throws Exception
+ {
+ JMSTestCase.startServer = false;
+ // Admin step
+ // gets the provider administration wrapper...
+ Properties props = getProviderProperties();
+ admin = AdminFactory.getAdmin(props);
+ admin.startServer();
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ System.out.println("TearDown");
+ admin.stopServer();
+ JMSTestCase.startServer = true;
+ }
+
+ }
+}
Modified: branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/conform/message/MessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/conform/message/MessageBodyTest.java 2011-08-09 17:43:09 UTC (rev 11170)
+++ branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/conform/message/MessageBodyTest.java 2011-08-09 19:29:50 UTC (rev 11171)
@@ -19,6 +19,8 @@
import javax.jms.TextMessage;
import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
import org.objectweb.jtests.jms.framework.PTPTestCase;
import org.objectweb.jtests.jms.framework.TestConfig;
@@ -31,7 +33,16 @@
*/
public class MessageBodyTest extends PTPTestCase
{
+
+ /**
+ * Method to use this class in a Test suite
+ */
+ public static Test suite()
+ {
+ return new TestSuite(MessageBodyTest.class);
+ }
+
/**
* Test that the <code>TextMessage.clearBody()</code> method does nto clear the
* message properties.
Modified: branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/framework/JMSTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/framework/JMSTestCase.java 2011-08-09 17:43:09 UTC (rev 11170)
+++ branches/Branch_2_2_EAP/tests/joram-tests/src/org/objectweb/jtests/jms/framework/JMSTestCase.java 2011-08-09 19:29:50 UTC (rev 11171)
@@ -35,7 +35,9 @@
*/
public abstract class JMSTestCase extends TestCase
{
- private static final String PROP_FILE_NAME = "provider.properties";
+ public static final String PROP_FILE_NAME = "provider.properties";
+
+ public static boolean startServer = true;
protected Admin admin;
@@ -91,8 +93,10 @@
Properties props = getProviderProperties();
admin = AdminFactory.getAdmin(props);
- admin.startServer();
-
+ if (startServer)
+ {
+ admin.startServer();
+ }
admin.start();
}
@@ -101,8 +105,10 @@
{
admin.stop();
- admin.stopServer();
-
+ if (startServer)
+ {
+ admin.stopServer();
+ }
super.tearDown();
}
13 years, 5 months
JBoss hornetq SVN: r11170 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 13:43:09 -0400 (Tue, 09 Aug 2011)
New Revision: 11170
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
Log:
fixing deadlock after my fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-09 13:40:37 UTC (rev 11169)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-09 17:43:09 UTC (rev 11170)
@@ -172,23 +172,32 @@
if (replaced)
{
- ArrayList<ClusterTopologyListener> copy = copyListeners();
- for (ClusterTopologyListener listener : copy)
- {
- if (Topology.log.isTraceEnabled())
+ final ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+
+ // Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
+ execute(new Runnable(){
+ public void run()
{
- Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
- }
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
+ }
- try
- {
- listener.nodeUP(nodeId, member.getConnector(), last);
+ try
+ {
+ listener.nodeUP(nodeId, member.getConnector(), last);
+ }
+ catch (Throwable e)
+ {
+ log.warn (e.getMessage(), e);
+ }
+ }
}
- catch (Throwable e)
- {
- log.warn (e.getMessage(), e);
- }
- }
+ });
+
}
return replaced;
@@ -230,16 +239,22 @@
if (member != null)
{
- ArrayList<ClusterTopologyListener> copy = copyListeners();
+ final ArrayList<ClusterTopologyListener> copy = copyListeners();
- for (ClusterTopologyListener listener : copy)
- {
- if (Topology.log.isTraceEnabled())
+ // Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
+ execute(new Runnable(){
+ public void run()
{
- Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
+ }
+ listener.nodeDown(nodeId);
+ }
}
- listener.nodeDown(nodeId);
- }
+ });
}
return member != null;
}
@@ -379,6 +394,19 @@
}
return null;
}
+
+ private void execute(Runnable runnable)
+ {
+ if (executor != null)
+ {
+ log.debug(this + " is running runnable without an executor");
+ executor.execute(runnable);
+ }
+ else
+ {
+ runnable.run();
+ }
+ }
/* (non-Javadoc)
* @see java.lang.Object#toString()
13 years, 5 months
JBoss hornetq SVN: r11169 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 09:40:37 -0400 (Tue, 09 Aug 2011)
New Revision: 11169
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 The JournalStorageManager must be started.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-09 13:40:07 UTC (rev 11168)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-09 13:40:37 UTC (rev 11169)
@@ -542,7 +542,6 @@
initialisePart1();
clusterManager.start();
-
String liveConnectorName = configuration.getLiveConnectorName();
if (liveConnectorName == null)
{
@@ -606,11 +605,10 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup Server was not yet in sync with live");
}
-
configuration.setBackup(false);
+ storageManager.start();
initialisePart2();
-
clusterManager.activate();
}
13 years, 5 months
JBoss hornetq SVN: r11168 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 09:40:07 -0400 (Tue, 09 Aug 2011)
New Revision: 11168
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
Log:
Fix spelling.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-08-09 13:39:34 UTC (rev 11167)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-08-09 13:40:07 UTC (rev 11168)
@@ -225,7 +225,7 @@
this.persistent = persistent;
}
- /** This method sould alwas be called from a single threaded executor */
+ /** This method should always be called from a single threaded executor */
protected void cleanup()
{
ArrayList<Long> deleteList;
13 years, 5 months
JBoss hornetq SVN: r11167 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 09:39:34 -0400 (Tue, 09 Aug 2011)
New Revision: 11167
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 Only change the replicator value once we have the lock.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09 13:38:59 UTC (rev 11166)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09 13:39:34 UTC (rev 11167)
@@ -358,7 +358,6 @@
throw new IllegalStateException("JournalStorageManager must be started...");
}
assert replicationManager != null;
- replicator = replicationManager;
if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
{
@@ -380,6 +379,7 @@
storageManagerLock.writeLock().lock();
try
{
+ replicator = replicationManager;
localMessageJournal.writeLock();
localBindingsJournal.writeLock();
13 years, 5 months