JBoss hornetq SVN: r11753 - in trunk/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-24 07:24:34 -0500 (Thu, 24 Nov 2011)
New Revision: 11753
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Send live's nodeID as soon as possible, as it is needed to detect failures.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-24 12:24:34 UTC (rev 11753)
@@ -396,8 +396,8 @@
pagingManager.lock();
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES, nodeID);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS, nodeID);
pageFilesToSync = getPageInformationForSync(pagingManager);
largeMessageFilesToSync = getLargeMessageInformation();
}
@@ -527,11 +527,12 @@
}
}
- private JournalFile[] prepareJournalForCopy(Journal journal, JournalContent contentType) throws Exception
+ private JournalFile[]
+ prepareJournalForCopy(Journal journal, JournalContent contentType, String nodeID) throws Exception
{
journal.forceMoveNextFile();
JournalFile[] datafiles = journal.getDataFiles();
- replicator.sendStartSyncMessage(datafiles, contentType);
+ replicator.sendStartSyncMessage(datafiles, contentType, nodeID);
return datafiles;
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-11-24 12:24:34 UTC (rev 11753)
@@ -30,9 +30,10 @@
this.nodeID = nodeID;
}
- public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
+ public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String nodeID)
{
this();
+ this.nodeID = nodeID;
synchronizationIsFinished = false;
ids = new long[datafiles.length];
for (int i = 0; i < datafiles.length; i++)
@@ -46,11 +47,9 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeBoolean(synchronizationIsFinished);
+ buffer.writeString(nodeID);
if (synchronizationIsFinished)
- {
- buffer.writeString(nodeID);
return;
- }
buffer.writeByte(journalType.typeByte);
buffer.writeInt(ids.length);
for (long id : ids)
@@ -63,9 +62,9 @@
public void decodeRest(final HornetQBuffer buffer)
{
synchronizationIsFinished = buffer.readBoolean();
+ nodeID = buffer.readString();
if (synchronizationIsFinished)
{
- nodeID = buffer.readString();
return;
}
journalType = JournalContent.getType(buffer.readByte());
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-11-24 12:24:34 UTC (rev 11753)
@@ -93,7 +93,8 @@
* @param contentType
* @throws HornetQException
*/
- void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+ void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String nodeID)
+ throws HornetQException;
/**
* Informs backup that data synchronization is done.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-24 12:24:34 UTC (rev 11753)
@@ -131,6 +131,7 @@
// Public --------------------------------------------------------
+ @Override
public void registerJournal(final byte id, final Journal journal)
{
if (journals == null || id >= journals.length)
@@ -150,6 +151,7 @@
journals[id] = journal;
}
+ @Override
public void handlePacket(final Packet packet)
{
PacketImpl response = new ReplicationResponseMessage();
@@ -341,17 +343,13 @@
started = false;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
- */
+ @Override
public Channel getChannel()
{
return channel;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
- */
+ @Override
public void setChannel(final Channel channel)
{
this.channel = channel;
@@ -568,7 +566,11 @@
synchronized (this)
{
if (!started)
- return;
+ return;
+ if (packet.getNodeID() != null)
+ {
+ quorumManager.setLiveID(packet.getNodeID());
+ }
Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
log.info("Journal " + packet.getJournalContentType() + ". Reserving fileIDs for synchronization: " +
Arrays.toString(packet.getFileIds()));
@@ -580,7 +582,7 @@
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
registerJournal(packet.getJournalContentType().typeByte, syncJournal);
}
- }
+ }
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-24 12:24:34 UTC (rev 11753)
@@ -110,9 +110,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
+ @Override
public void appendUpdateRecord(final byte journalID,
final long id,
final byte recordType,
@@ -124,9 +122,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
- */
+ @Override
public void appendDeleteRecord(final byte journalID, final long id) throws Exception
{
if (enabled)
@@ -585,10 +581,13 @@
}
@Override
- public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
+ public
+ void
+ sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String nodeID)
+ throws HornetQException
{
if (enabled)
- sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID));
}
@Override
13 years, 1 month
JBoss hornetq SVN: r11752 - branches.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-24 05:25:58 -0500 (Thu, 24 Nov 2011)
New Revision: 11752
Removed:
branches/HORNETQ-720_Replication/
Log:
Remove HORNETQ-720 branch as work is now done at trunk
13 years, 1 month
JBoss hornetq SVN: r11751 - branches.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-24 05:23:37 -0500 (Thu, 24 Nov 2011)
New Revision: 11751
Removed:
branches/HORNETQ-698_SplitJournal/
Log:
remove HORNETQ-698 branch for splitting the Journal
13 years, 1 month
JBoss hornetq SVN: r11750 - in branches/Branch_2_2_AS7: src/main/org/hornetq/core/protocol/core/impl and 10 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-23 16:54:38 -0500 (Wed, 23 Nov 2011)
New Revision: 11750
Added:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
Removed:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
Modified:
branches/Branch_2_2_AS7/build-hornetq.xml
branches/Branch_2_2_AS7/build.xml
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
moving changes from EAP
Modified: branches/Branch_2_2_AS7/build-hornetq.xml
===================================================================
--- branches/Branch_2_2_AS7/build-hornetq.xml 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/build-hornetq.xml 2011-11-23 21:54:38 UTC (rev 11750)
@@ -1660,62 +1660,62 @@
</javac>
</target>
- <target name="performance-tests" depends="jar, compile-unit-tests">
+ <target name="performance-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/performance/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="integration-tests" depends="jar, compile-unit-tests">
+ <target name="integration-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="management-tests" depends="jar, compile-unit-tests">
+ <target name="management-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/management/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="jms-management-tests" depends="jar, compile-unit-tests">
+ <target name="jms-management-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/jms/server/management/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="spring-tests" depends="jar, compile-unit-tests">
+ <target name="spring-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/spring/*${test-mask}.class"/>
</antcall>
</target>
- <target name="failover-tests" depends="jar, compile-unit-tests">
+ <target name="failover-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/failover/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="distribution-tests" depends="jar, compile-unit-tests">
+ <target name="distribution-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/distribution/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="cluster-tests" depends="jar, compile-unit-tests">
+ <target name="cluster-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="concurrent-tests" depends="jar, compile-unit-tests">
+ <target name="concurrent-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/concurrent/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="unit-tests" depends="jar, compile-unit-tests">
+ <target name="unit-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/unit/**/*${test-mask}.class"/>
<!-- if tests.validate.error is defined, it will fail the build in case of any test failure -->
@@ -1723,13 +1723,13 @@
</antcall>
</target>
- <target name="timing-tests" depends="jar, compile-unit-tests">
+ <target name="timing-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/timing/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="tests" depends="jar, compile-unit-tests">
+ <target name="tests">
<echo message=""/>
<echo message="Running unit tests, fork=${junit.fork}, junit.batchtest.fork=${junit.batchtest.fork}"/>
<echo message="classpath is:${toString:unit.test.execution.classpath}"/>
@@ -1957,9 +1957,9 @@
</junit>
</target>
- <target name="all-tests" depends="unit-tests, integration-tests, concurrent-tests, stress-tests, jms-tests, joram-tests, rest-tests"/>
+ <target name="all-tests" depends="jar, unit-tests, integration-tests, concurrent-tests, stress-tests, jms-tests, joram-tests, rest-tests"/>
- <target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, timing-tests, jms-tests, joram-tests"/>
+ <target name="hudson-tests" depends="jar, unit-tests, integration-tests, concurrent-tests, timing-tests, jms-tests, joram-tests"/>
<target name="compile-reports">
<mkdir dir="${test.stylesheets.dir}"/>
Modified: branches/Branch_2_2_AS7/build.xml
===================================================================
--- branches/Branch_2_2_AS7/build.xml 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/build.xml 2011-11-23 21:54:38 UTC (rev 11750)
@@ -303,6 +303,7 @@
</target>
<target name="jms-tests" depends="createthirdparty">
+ <ant antfile="build-hornetq.xml" target="jar"/>
<ant antfile="build-hornetq.xml" target="jms-tests"/>
<ant antfile="build-hornetq.xml" target="compile-reports"/>
</target>
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -177,17 +177,16 @@
return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
}
};
-
- final boolean isCC = msg.isClusterConnection();
+
if (acceptorUsed.getClusterConnection() != null)
{
- acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+ acceptorUsed.getClusterConnection().addClusterTopologyListener(listener);
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
{
- acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+ acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener);
}
});
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -42,9 +42,9 @@
void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
- void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+ void addClusterTopologyListener(ClusterTopologyListener listener);
- void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+ void removeClusterTopologyListener(ClusterTopologyListener listener);
/**
* @return a Map of node ID and addresses
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -246,7 +246,7 @@
clusterConnector = new StaticClusterConnector(tcConfigs);
- backupServerLocator = clusterConnector.createServerLocator(false);
+ backupServerLocator = clusterConnector.createServerLocator();
if (backupServerLocator != null)
{
@@ -356,7 +356,7 @@
clusterConnector = new DiscoveryClusterConnector(dg);
- backupServerLocator = clusterConnector.createServerLocator(true);
+ backupServerLocator = clusterConnector.createServerLocator();
if (backupServerLocator != null)
{
@@ -507,7 +507,7 @@
return topology.getMember(manager.getNodeId());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topology.addClusterTopologyListener(listener);
@@ -515,7 +515,7 @@
topology.sendTopology(listener);
}
- public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
{
topology.removeClusterTopologyListener(listener);
}
@@ -642,7 +642,7 @@
backupServerLocator = null;
}
- serverLocator = clusterConnector.createServerLocator(true);
+ serverLocator = clusterConnector.createServerLocator();
if (serverLocator != null)
{
@@ -680,7 +680,7 @@
this.serverLocator.setRetryInterval(retryInterval);
}
- serverLocator.addClusterTopologyListener(this);
+ addClusterTopologyListener(this);
serverLocator.setAfterConnectionInternalListener(this);
@@ -1567,7 +1567,7 @@
interface ClusterConnector
{
- ServerLocatorInternal createServerLocator(boolean includeTopology);
+ ServerLocatorInternal createServerLocator();
}
private class StaticClusterConnector implements ClusterConnector
@@ -1579,7 +1579,7 @@
this.tcConfigs = tcConfigs;
}
- public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ public ServerLocatorInternal createServerLocator()
{
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -1587,7 +1587,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
return locator;
}
@@ -1617,9 +1617,9 @@
this.dg = dg;
}
- public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ public ServerLocatorInternal createServerLocator()
{
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, dg);
return locator;
}
Deleted: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -1,144 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.NodeManager;
-import org.hornetq.utils.UUIDGenerator;
-
-import java.util.concurrent.Semaphore;
-
-import static org.hornetq.core.server.impl.InVMNodeManager.State.*;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
- * Date: Oct 13, 2010
- * Time: 3:55:47 PM
- */
-public class InVMNodeManager extends NodeManager
-{
-
- private Semaphore liveLock;
-
- private Semaphore backupLock;
-
- public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
-
- public State state = NOT_STARTED;
-
- public InVMNodeManager()
- {
- liveLock = new Semaphore(1);
- backupLock = new Semaphore(1);
- uuid = UUIDGenerator.getInstance().generateUUID();
- nodeID = new SimpleString(uuid.toString());
- }
-
- @Override
- public void awaitLiveNode() throws Exception
- {
- do
- {
- while (state == NOT_STARTED)
- {
- Thread.sleep(2000);
- }
-
- liveLock.acquire();
-
- if (state == PAUSED)
- {
- liveLock.release();
- Thread.sleep(2000);
- }
- else if (state == FAILING_BACK)
- {
- liveLock.release();
- Thread.sleep(2000);
- }
- else if (state == LIVE)
- {
- break;
- }
- }
- while (true);
- }
-
- @Override
- public void startBackup() throws Exception
- {
- backupLock.acquire();
- }
-
- @Override
- public void startLiveNode() throws Exception
- {
- state = FAILING_BACK;
- liveLock.acquire();
- state = LIVE;
- }
-
- @Override
- public void pauseLiveServer() throws Exception
- {
- state = PAUSED;
- liveLock.release();
- }
-
- @Override
- public void crashLiveServer() throws Exception
- {
- //overkill as already set to live
- state = LIVE;
- liveLock.release();
- }
-
- @Override
- public void stopBackup() throws Exception
- {
- backupLock.release();
- }
-
- @Override
- public void releaseBackup()
- {
- releaseBackupNode();
- }
-
- @Override
- public boolean isAwaitingFailback() throws Exception
- {
- return state == FAILING_BACK;
- }
-
- @Override
- public boolean isBackupLive() throws Exception
- {
- return liveLock.availablePermits() == 0;
- }
-
- @Override
- public void interrupt()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- private void releaseBackupNode()
- {
- if(backupLock != null)
- {
- backupLock.release();
- }
- }
-}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -399,12 +399,29 @@
// We execute this on the same executor to make sure the force delivery message is written after
// any delivery is completed
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
-
- callback.sendMessage(forcedDeliveryMessage, id, 0);
+ synchronized (lock)
+ {
+ if (transferring)
+ {
+ // Case it's transferring (reattach), we will retry later
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ forceDelivery(sequence);
+ }
+ });
+ }
+ else
+ {
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ }
}
catch (Exception e)
{
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -283,7 +283,7 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
+ return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBuffer().capacity() +
",expiration=" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : 0) +
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -14,7 +14,7 @@
package org.hornetq.tests.integration.cluster;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.util.ServiceTestBase;
import java.util.ArrayList;
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -41,8 +41,8 @@
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
/**
* A BridgeReconnectTest
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -57,7 +57,7 @@
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -34,8 +34,8 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -21,8 +21,8 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import java.util.ArrayList;
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -41,7 +41,7 @@
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -26,7 +26,7 @@
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -24,8 +24,8 @@
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.security.Role;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -27,7 +27,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -13,10 +13,14 @@
package org.hornetq.tests.integration.cluster.reattach;
+import java.util.HashMap;
+import java.util.Map;
+
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServers;
/**
@@ -37,8 +41,12 @@
liveConf.setJMXManagementEnabled(false);
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations().clear();
+
+ Map<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
+
liveConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory"));
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory", connectionParams));
liveServer = HornetQServers.newHornetQServer(liveConf, false);
liveServer.start();
}
@@ -46,11 +54,15 @@
@Override
protected ServerLocator createLocator() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory")) ;
+ Map<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
+
+
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory", connectionParams)) ;
locator.setReconnectAttempts(-1);
locator.setConfirmationWindowSize(1024 * 1024);
locator.setAckBatchSize(0);
return locator;
}
-
+
}
Added: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java (rev 0)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.util;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.util.concurrent.Semaphore;
+
+import static org.hornetq.tests.integration.cluster.util.InVMNodeManager.State.*;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 13, 2010
+ * Time: 3:55:47 PM
+ */
+public class InVMNodeManager extends NodeManager
+{
+
+ private Semaphore liveLock;
+
+ private Semaphore backupLock;
+
+ public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
+
+ public State state = NOT_STARTED;
+
+ public InVMNodeManager()
+ {
+ liveLock = new Semaphore(1);
+ backupLock = new Semaphore(1);
+ uuid = UUIDGenerator.getInstance().generateUUID();
+ nodeID = new SimpleString(uuid.toString());
+ }
+
+ @Override
+ public void awaitLiveNode() throws Exception
+ {
+ do
+ {
+ while (state == NOT_STARTED)
+ {
+ Thread.sleep(2000);
+ }
+
+ liveLock.acquire();
+
+ if (state == PAUSED)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == FAILING_BACK)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == LIVE)
+ {
+ break;
+ }
+ }
+ while (true);
+ }
+
+ @Override
+ public void startBackup() throws Exception
+ {
+ backupLock.acquire();
+ }
+
+ @Override
+ public void startLiveNode() throws Exception
+ {
+ state = FAILING_BACK;
+ liveLock.acquire();
+ state = LIVE;
+ }
+
+ @Override
+ public void pauseLiveServer() throws Exception
+ {
+ state = PAUSED;
+ liveLock.release();
+ }
+
+ @Override
+ public void crashLiveServer() throws Exception
+ {
+ //overkill as already set to live
+ state = LIVE;
+ liveLock.release();
+ }
+
+ @Override
+ public void stopBackup() throws Exception
+ {
+ backupLock.release();
+ }
+
+ @Override
+ public void releaseBackup()
+ {
+ releaseBackupNode();
+ }
+
+ @Override
+ public boolean isAwaitingFailback() throws Exception
+ {
+ return state == FAILING_BACK;
+ }
+
+ @Override
+ public boolean isBackupLive() throws Exception
+ {
+ return liveLock.availablePermits() == 0;
+ }
+
+ @Override
+ public void interrupt()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ private void releaseBackupNode()
+ {
+ if(backupLock != null)
+ {
+ backupLock.release();
+ }
+ }
+}
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-11-23 21:01:00 UTC (rev 11749)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-11-23 21:54:38 UTC (rev 11750)
@@ -40,14 +40,12 @@
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
@@ -55,6 +53,7 @@
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.jms.server.management.JMSUtil;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
13 years, 1 month
JBoss hornetq SVN: r11749 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-23 16:01:00 -0500 (Wed, 23 Nov 2011)
New Revision: 11749
Added:
tags/HornetQ_2_2_8_EAP_GA/
Log:
retagging 2.2.8.EAP.GA
13 years, 1 month
JBoss hornetq SVN: r11748 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-23 16:00:27 -0500 (Wed, 23 Nov 2011)
New Revision: 11748
Removed:
tags/HornetQ_2_2_8_EAP_GA/
Log:
retagging 2.2.8.EAP.GA
13 years, 1 month
JBoss hornetq SVN: r11747 - branches/Branch_2_2_EAP.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-11-23 15:12:35 -0500 (Wed, 23 Nov 2011)
New Revision: 11747
Modified:
branches/Branch_2_2_EAP/build-hornetq.xml
branches/Branch_2_2_EAP/build.xml
Log:
Changing some dependencies on tests on build.xml and build-hornetq.xml to avoid double compilation
Modified: branches/Branch_2_2_EAP/build-hornetq.xml
===================================================================
--- branches/Branch_2_2_EAP/build-hornetq.xml 2011-11-23 16:14:50 UTC (rev 11746)
+++ branches/Branch_2_2_EAP/build-hornetq.xml 2011-11-23 20:12:35 UTC (rev 11747)
@@ -1658,62 +1658,62 @@
</javac>
</target>
- <target name="performance-tests" depends="jar, compile-unit-tests">
+ <target name="performance-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/performance/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="integration-tests" depends="jar, compile-unit-tests">
+ <target name="integration-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="management-tests" depends="jar, compile-unit-tests">
+ <target name="management-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/management/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="jms-management-tests" depends="jar, compile-unit-tests">
+ <target name="jms-management-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/jms/server/management/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="spring-tests" depends="jar, compile-unit-tests">
+ <target name="spring-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/spring/*${test-mask}.class"/>
</antcall>
</target>
- <target name="failover-tests" depends="jar, compile-unit-tests">
+ <target name="failover-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/failover/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="distribution-tests" depends="jar, compile-unit-tests">
+ <target name="distribution-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/distribution/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="cluster-tests" depends="jar, compile-unit-tests">
+ <target name="cluster-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="concurrent-tests" depends="jar, compile-unit-tests">
+ <target name="concurrent-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/concurrent/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="unit-tests" depends="jar, compile-unit-tests">
+ <target name="unit-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/unit/**/*${test-mask}.class"/>
<!-- if tests.validate.error is defined, it will fail the build in case of any test failure -->
@@ -1721,13 +1721,13 @@
</antcall>
</target>
- <target name="timing-tests" depends="jar, compile-unit-tests">
+ <target name="timing-tests" depends="compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/timing/**/*${test-mask}.class"/>
</antcall>
</target>
- <target name="tests" depends="jar, compile-unit-tests">
+ <target name="tests">
<echo message=""/>
<echo message="Running unit tests, fork=${junit.fork}, junit.batchtest.fork=${junit.batchtest.fork}"/>
<echo message="classpath is:${toString:unit.test.execution.classpath}"/>
@@ -1955,9 +1955,9 @@
</junit>
</target>
- <target name="all-tests" depends="unit-tests, integration-tests, concurrent-tests, stress-tests, jms-tests, joram-tests, rest-tests"/>
+ <target name="all-tests" depends="jar, unit-tests, integration-tests, concurrent-tests, stress-tests, jms-tests, joram-tests, rest-tests"/>
- <target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, timing-tests, jms-tests, joram-tests"/>
+ <target name="hudson-tests" depends="jar, unit-tests, integration-tests, concurrent-tests, timing-tests, jms-tests, joram-tests"/>
<target name="compile-reports">
<mkdir dir="${test.stylesheets.dir}"/>
Modified: branches/Branch_2_2_EAP/build.xml
===================================================================
--- branches/Branch_2_2_EAP/build.xml 2011-11-23 16:14:50 UTC (rev 11746)
+++ branches/Branch_2_2_EAP/build.xml 2011-11-23 20:12:35 UTC (rev 11747)
@@ -303,6 +303,7 @@
</target>
<target name="jms-tests" depends="createthirdparty">
+ <ant antfile="build-hornetq.xml" target="jar"/>
<ant antfile="build-hornetq.xml" target="jms-tests"/>
<ant antfile="build-hornetq.xml" target="compile-reports"/>
</target>
13 years, 1 month
JBoss hornetq SVN: r11746 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-23 11:14:50 -0500 (Wed, 23 Nov 2011)
New Revision: 11746
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
Log:
Using NIO on the NettyReattachTest
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-11-23 10:36:50 UTC (rev 11745)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-11-23 16:14:50 UTC (rev 11746)
@@ -13,10 +13,14 @@
package org.hornetq.tests.integration.cluster.reattach;
+import java.util.HashMap;
+import java.util.Map;
+
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServers;
/**
@@ -37,8 +41,12 @@
liveConf.setJMXManagementEnabled(false);
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations().clear();
+
+ Map<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
+
liveConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory"));
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory", connectionParams));
liveServer = HornetQServers.newHornetQServer(liveConf, false);
liveServer.start();
}
@@ -46,11 +54,15 @@
@Override
protected ServerLocator createLocator() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory")) ;
+ Map<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
+
+
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory", connectionParams)) ;
locator.setReconnectAttempts(-1);
locator.setConfirmationWindowSize(1024 * 1024);
locator.setAckBatchSize(0);
return locator;
}
-
+
}
13 years, 1 month
JBoss hornetq SVN: r11745 - branches/Branch_2_2_EAP.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-23 05:36:50 -0500 (Wed, 23 Nov 2011)
New Revision: 11745
Modified:
branches/Branch_2_2_EAP/build-hornetq.properties
branches/Branch_2_2_EAP/build-hornetq.xml
Log:
Test cases are hanging and not completing. Fork per testcase, fork timeout is 10 min.
Modified: branches/Branch_2_2_EAP/build-hornetq.properties
===================================================================
--- branches/Branch_2_2_EAP/build-hornetq.properties 2011-11-23 05:02:11 UTC (rev 11744)
+++ branches/Branch_2_2_EAP/build-hornetq.properties 2011-11-23 10:36:50 UTC (rev 11745)
@@ -24,8 +24,8 @@
junit.haltonfailure=false
junit.fork=true
junit.includeantruntime=true
-# 120 mins
-junit.timeout=7200000
+# 10 mins - assumes per test case JVM fork
+junit.timeout=600000
# 150 mins
clustering.junit.timeout=9000000
# 90 mins
Modified: branches/Branch_2_2_EAP/build-hornetq.xml
===================================================================
--- branches/Branch_2_2_EAP/build-hornetq.xml 2011-11-23 05:02:11 UTC (rev 11744)
+++ branches/Branch_2_2_EAP/build-hornetq.xml 2011-11-23 10:36:50 UTC (rev 11745)
@@ -1737,7 +1737,7 @@
<mkdir dir="${logs.dir}"/>
<junit printsummary="${junit.printsummary}"
fork="on"
- forkMode="once"
+ forkMode="perTest"
includeantruntime="${junit.includeantruntime}"
haltonerror="${junit.haltonerror}"
haltonfailure="${junit.haltonfailure}"
13 years, 1 month
JBoss hornetq SVN: r11744 - in trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-23 00:02:11 -0500 (Wed, 23 Nov 2011)
New Revision: 11744
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
refactoring
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22 21:37:07 UTC (rev 11743)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-23 05:02:11 UTC (rev 11744)
@@ -16,10 +16,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
/**
*
@@ -133,9 +136,6 @@
return receipt;
}
- public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount) throws Exception;
-
public abstract StompFrame createStompFrame(String command);
public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException;
@@ -298,4 +298,56 @@
return response;
}
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception
+ {
+ StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
+ subscription.getID());
+ }
+
+ synchronized (serverMessage)
+ {
+
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer
+ .writerIndex() : serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
+ + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH)
+ || serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ frame.setByteBody(data);
+
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
+ deliveryCount);
+ }
+
+ return frame;
+ }
+
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22 21:37:07 UTC (rev 11743)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-23 05:02:11 UTC (rev 11744)
@@ -171,56 +171,6 @@
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount) throws Exception
- {
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
-
- if (subscription.getID() != null)
- {
- frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
-
- synchronized(serverMessage)
- {
-
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- : serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
-
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
- {
- frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
- frame.setByteBody(data);
-
- serverMessage.getBodyBuffer().resetReaderIndex();
-
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
- }
-
- return frame;
-
- }
-
- @Override
public StompFrame createStompFrame(String command)
{
return new StompFrameV10(command);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22 21:37:07 UTC (rev 11743)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-23 05:02:11 UTC (rev 11744)
@@ -247,53 +247,6 @@
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount)
- throws Exception
- {
- StompFrame frame = new StompFrameV11(Stomp.Responses.MESSAGE);
-
- if (subscription.getID() != null)
- {
- frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
-
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- : serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
- {
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
-
- frame.setByteBody(data);
-
- serverMessage.getBodyBuffer().resetReaderIndex();
-
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-
- return frame;
-
- }
-
- @Override
public void replySent(StompFrame reply)
{
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
13 years, 1 month