JBoss hornetq SVN: r11196 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-12 09:58:57 -0400 (Fri, 12 Aug 2011)
New Revision: 11196
Modified:
trunk/pom.xml
Log:
Upgrade 'site' and 'checkstyle' maven plugins
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-08-12 12:37:16 UTC (rev 11195)
+++ trunk/pom.xml 2011-08-12 13:58:57 UTC (rev 11196)
@@ -435,7 +435,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
- <version>3.0-beta-3</version>
+ <version>3.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -606,7 +606,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.6</version>
+ <version>2.7</version>
<configuration>
<configLocation>checkstyle.xml</configLocation>
</configuration>
13 years, 5 months
JBoss hornetq SVN: r11195 - trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-12 08:37:16 -0400 (Fri, 12 Aug 2011)
New Revision: 11195
Removed:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java
Log:
Delete empty test class.
Deleted: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java 2011-08-12 02:20:12 UTC (rev 11194)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java 2011-08-12 12:37:16 UTC (rev 11195)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.paging.impl;
-
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A PagePositionTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PagePositionTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testNextSequenceOf()
- {
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
13 years, 5 months
JBoss hornetq SVN: r11194 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 22:20:12 -0400 (Thu, 11 Aug 2011)
New Revision: 11194
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Improving testsuite
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-12 02:20:12 UTC (rev 11194)
@@ -1137,9 +1137,19 @@
super.finalize();
}
+
+ public void cleanup()
+ {
+ doClose(false);
+ }
public void close()
{
+ doClose(true);
+ }
+
+ protected void doClose(final boolean sendClose)
+ {
if (closed)
{
if (log.isDebugEnabled())
@@ -1176,7 +1186,14 @@
for (ClientSessionFactory factory : clonedFactory)
{
- factory.close();
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
}
factories.clear();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-12 02:20:12 UTC (rev 11194)
@@ -39,6 +39,8 @@
void setNodeID(String nodeID);
String getNodeID();
+
+ void cleanup();
ClientSessionFactory connect() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-12 02:20:12 UTC (rev 11194)
@@ -104,7 +104,7 @@
private volatile ServerLocatorInternal backupServerLocator;
- private final Set<ServerLocator> clusterLocators = new ConcurrentHashSet<ServerLocator>();
+ private final Set<ServerLocatorInternal> clusterLocators = new ConcurrentHashSet<ServerLocatorInternal>();
private final Executor executor;
@@ -143,21 +143,21 @@
this.clustered = clustered;
}
-
+
public String describe()
{
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
-
+
out.println("Information on " + this);
out.println("*******************************************************");
out.println("Topology: " + topology.describe("Toopology on " + this));
-
+
for (ClusterConnection conn : this.clusterConnections.values())
{
out.println(conn.describe());
}
-
+
out.println("*******************************************************");
return str.toString();
@@ -167,7 +167,7 @@
{
return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
}
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -204,56 +204,60 @@
started = true;
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
- if (!started)
+ synchronized (this)
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- if (clustered)
- {
- for (BroadcastGroup group : broadcastGroups.values())
+ if (clustered)
{
- group.stop();
- managementService.unregisterBroadcastGroup(group.getName());
+ for (BroadcastGroup group : broadcastGroups.values())
+ {
+ group.stop();
+ managementService.unregisterBroadcastGroup(group.getName());
+ }
+
+ broadcastGroups.clear();
+
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ clusterConnection.stop();
+ managementService.unregisterCluster(clusterConnection.getName().toString());
+ }
+
}
- broadcastGroups.clear();
-
- for (ClusterConnection clusterConnection : clusterConnections.values())
+ for (Bridge bridge : bridges.values())
{
- clusterConnection.stop();
- managementService.unregisterCluster(clusterConnection.getName().toString());
+ bridge.stop();
+ managementService.unregisterBridge(bridge.getName().toString());
}
- }
+ bridges.clear();
- for (Bridge bridge : bridges.values())
- {
- bridge.stop();
- managementService.unregisterBridge(bridge.getName().toString());
+ if (backupServerLocator != null)
+ {
+ backupServerLocator.close();
+ backupServerLocator = null;
+ }
}
- bridges.clear();
-
- if (backupServerLocator != null)
+ for (ServerLocatorInternal clusterLocator : clusterLocators)
{
- backupServerLocator.close();
- backupServerLocator = null;
- }
-
- executor.execute(new Runnable()
- {
- public void run()
+ try
{
- for (ServerLocator clusterLocator : clusterLocators)
- {
- clusterLocator.close();
- }
- clusterLocators.clear();
+ clusterLocator.close();
}
- });
+ catch (Exception e)
+ {
+ log.warn("Error closing serverLocator=" + clusterLocator + ", message=" + e.getMessage(), e);
+ }
+ }
+ clusterLocators.clear();
started = false;
clusterConnections.clear();
@@ -265,9 +269,9 @@
{
return;
}
-
- log.debug(this + "::removing nodeID=" + nodeID, new Exception ("trace"));
+ log.debug(this + "::removing nodeID=" + nodeID, new Exception("trace"));
+
topology.removeMember(nodeID);
}
@@ -284,22 +288,32 @@
TopologyMember member = new TopologyMember(connectorPair);
boolean updated = topology.addMember(nodeID, member, last);
-
+
if (!updated)
{
if (log.isDebugEnabled())
{
- log.debug(this + " ignored notifyNodeUp on nodeID=" + nodeID + " pair=" + connectorPair + " as the topology already knew about it");
+ log.debug(this + " ignored notifyNodeUp on nodeID=" +
+ nodeID +
+ " pair=" +
+ connectorPair +
+ " as the topology already knew about it");
}
return;
}
if (log.isDebugEnabled())
{
- log.debug(this + " received notifyNodeUp nodeID=" + nodeID + " connectorPair=" + connectorPair +
- ", nodeAnnounce=" + nodeAnnounce + ", last=" + last);
+ log.debug(this + " received notifyNodeUp nodeID=" +
+ nodeID +
+ " connectorPair=" +
+ connectorPair +
+ ", nodeAnnounce=" +
+ nodeAnnounce +
+ ", last=" +
+ last);
}
-
+
// if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
// connections.
if (nodeAnnounce)
@@ -312,8 +326,14 @@
{
if (log.isTraceEnabled())
{
- log.trace(this + " information clusterConnection=" + clusterConnection +
- " nodeID=" + nodeID + " connectorPair=" + connectorPair + " last=" + last);
+ log.trace(this + " information clusterConnection=" +
+ clusterConnection +
+ " nodeID=" +
+ nodeID +
+ " connectorPair=" +
+ connectorPair +
+ " last=" +
+ last);
}
clusterConnection.nodeUP(nodeID, connectorPair, last);
}
@@ -350,17 +370,17 @@
topology.addClusterTopologyListener(listener);
// We now need to send the current topology to the client
- executor.execute(new Runnable(){
+ executor.execute(new Runnable()
+ {
public void run()
{
topology.sendTopology(listener);
-
+
}
});
}
- public void removeClusterTopologyListener(final ClusterTopologyListener listener,
- final boolean clusterConnection)
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
{
topology.removeClusterTopologyListener(listener);
}
@@ -380,8 +400,9 @@
String nodeID = server.getNodeID().toString();
TopologyMember member = topology.getMember(nodeID);
- //swap backup as live and send it to everybody
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b, null));
+ // swap backup as live and send it to everybody
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b,
+ null));
topology.addMember(nodeID, member, false);
if (backupServerLocator != null)
@@ -434,7 +455,7 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
+
topology.sendMemberToListeners(nodeID, member);
}
}
@@ -460,7 +481,7 @@
log.warn("no cluster connections defined, unable to announce backup");
}
}
-
+
void addClusterLocator(final ServerLocatorInternal serverLocator)
{
this.clusterLocators.add(serverLocator);
@@ -681,7 +702,7 @@
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
-
+
// We are going to manually retry on the bridge in case of failure
serverLocator.setReconnectAttempts(0);
serverLocator.setInitialConnectAttempts(-1);
@@ -693,12 +714,12 @@
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
if (!config.isUseDuplicateDetection())
{
- log.debug("Bridge " + config.getName() +
+ log.debug("Bridge " + config.getName() +
" is configured to not use duplicate detecion, it will send messages synchronously");
}
-
+
clusterLocators.add(serverLocator);
-
+
Bridge bridge = new BridgeImpl(serverLocator,
config.getReconnectAttempts(),
config.getRetryInterval(),
@@ -731,7 +752,7 @@
public void destroyBridge(final String name) throws Exception
{
Bridge bridge;
-
+
synchronized (this)
{
bridge = bridges.remove(name);
@@ -741,7 +762,7 @@
managementService.unregisterBridge(name);
}
}
-
+
bridge.flushExecutor();
}
@@ -790,10 +811,13 @@
"'. The cluster connection will not be deployed.");
return;
}
-
+
if (log.isDebugEnabled())
{
- log.debug(this + " Starting a Discovery Group Cluster Connection, name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
+ log.debug(this + " Starting a Discovery Group Cluster Connection, name=" +
+ config.getDiscoveryGroupName() +
+ ", dg=" +
+ dg);
}
clusterConnection = new ClusterConnectionImpl(this,
@@ -828,7 +852,7 @@
{
TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
: null;
-
+
if (log.isDebugEnabled())
{
log.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
@@ -869,7 +893,7 @@
if (log.isDebugEnabled())
{
- log.debug("ClusterConnection.start at " + clusterConnection, new Exception ("trace"));
+ log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
}
clusterConnection.start();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-12 02:20:12 UTC (rev 11194)
@@ -2059,6 +2059,7 @@
{
ClusterTestBase.log.info("stopping server " + node);
servers[node].stop();
+ Thread.sleep(500);
ClusterTestBase.log.info("server " + node + " stopped");
}
catch (Exception e)
13 years, 5 months
JBoss hornetq SVN: r11193 - branches/Branch_2_2_EAP/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 20:05:39 -0400 (Thu, 11 Aug 2011)
New Revision: 11193
Modified:
branches/Branch_2_2_EAP/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java
Log:
fixing typo on code
Modified: branches/Branch_2_2_EAP/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java 2011-08-11 23:19:56 UTC (rev 11192)
+++ branches/Branch_2_2_EAP/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java 2011-08-12 00:05:39 UTC (rev 11193)
@@ -33,7 +33,7 @@
{
protected ExecutorService threadPool;
protected QueueServiceManager queueManager = new QueueServiceManager();
- protected TopicServiceManager topicManager = new TopicServiceManager();
+ protected TopicServiceManager topicManager = new TopicServiceManager();
protected TimeoutTask timeoutTask;
protected int timeoutTaskInterval = 1;
protected MessageServiceConfiguration configuration = new MessageServiceConfiguration();
@@ -178,7 +178,7 @@
queueManager.setLinkStrategy(linkStrategy);
queueManager.setRegistry(registry);
- queueManager.setServerLocator(defaultLocator);
+ topicManager.setServerLocator(defaultLocator);
topicManager.setSessionFactory(sessionFactory);
topicManager.setTimeoutTask(timeoutTask);
topicManager.setConsumerServerLocator(consumerLocator);
13 years, 5 months
JBoss hornetq SVN: r11192 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 19:19:56 -0400 (Thu, 11 Aug 2011)
New Revision: 11192
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
changing timeout
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-11 19:58:06 UTC (rev 11191)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-11 23:19:56 UTC (rev 11192)
@@ -937,7 +937,7 @@
boolean failed = true;
- long timeout = System.currentTimeMillis() + 10000;
+ long timeout = System.currentTimeMillis() + 60000;
while (failed && timeout > System.currentTimeMillis())
{
buffer = new StringBuffer();
13 years, 5 months
JBoss hornetq SVN: r11191 - in branches/Branch_2_2_EAP: src/main/org/hornetq/utils and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 15:58:06 -0400 (Thu, 11 Aug 2011)
New Revision: 11191
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java
Log:
https://issues.jboss.org/browse/HORNETQ-746 - Fixing a deadlock with Netty NIO
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-08-11 17:09:49 UTC (rev 11190)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-08-11 19:58:06 UTC (rev 11191)
@@ -93,8 +93,6 @@
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
- private boolean largeMessageInDelivery;
-
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
@@ -235,7 +233,7 @@
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessageInDelivery)
+ if (largeMessageDeliverer != null)
{
return HandleStatus.BUSY;
}
@@ -463,21 +461,6 @@
synchronized (lock)
{
this.transferring = transferring;
-
- if (transferring)
- {
- // Now we must wait for any large message delivery to finish
- while (largeMessageInDelivery)
- {
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
}
// Outside the lock
@@ -662,28 +645,30 @@
private void promptDelivery()
{
- synchronized (lock)
+ // largeMessageDeliverer is aways set inside a lock
+ // if we don't acquire a lock, we will have NPE eventually
+ if (largeMessageDeliverer != null)
{
- // largeMessageDeliverer is aways set inside a lock
- // if we don't acquire a lock, we will have NPE eventually
- if (largeMessageDeliverer != null)
- {
- resumeLargeMessage();
- }
- else
- {
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- messageQueue.forceDelivery();
- }
- }
+ resumeLargeMessage();
}
+ else
+ {
+ forceDelivery();
+ }
}
+ private void forceDelivery()
+ {
+ if (browseOnly)
+ {
+ messageQueue.getExecutor().execute(browserDeliverer);
+ }
+ else
+ {
+ messageQueue.deliverAsync();
+ }
+ }
+
private void resumeLargeMessage()
{
messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -691,8 +676,6 @@
private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
{
- largeMessageInDelivery = true;
-
final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
// it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -714,6 +697,7 @@
}
}
+
// Inner classes
// ------------------------------------------------------------------------
@@ -727,16 +711,7 @@
{
if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
{
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- // prompt Delivery only if chunk was finished
-
- messageQueue.deliverAsync();
- }
+ forceDelivery();
}
}
catch (Exception e)
@@ -901,8 +876,6 @@
largeMessageDeliverer = null;
- largeMessageInDelivery = false;
-
largeMessage = null;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java 2011-08-11 17:09:49 UTC (rev 11190)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/ConfigurationHelper.java 2011-08-11 19:58:06 UTC (rev 11191)
@@ -73,7 +73,7 @@
{
return Integer.valueOf((String)prop);
}
- else if (prop instanceof Integer == false)
+ else if (prop instanceof Number == false)
{
ConfigurationHelper.log.warn("Property " + propName +
" must be an Integer, it is " +
@@ -83,7 +83,7 @@
}
else
{
- return (Integer)prop;
+ return ((Number)prop).intValue();
}
}
}
@@ -108,7 +108,7 @@
{
return Long.valueOf((String)prop);
}
- else if (prop instanceof Long == false)
+ else if (prop instanceof Number == false)
{
ConfigurationHelper.log.warn("Property " + propName +
" must be an Long, it is " +
@@ -118,7 +118,7 @@
}
else
{
- return (Long)prop;
+ return ((Number)prop).longValue();
}
}
}
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java 2011-08-11 19:58:06 UTC (rev 11191)
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * -- https://issues.jboss.org/browse/HORNETQ-746
+ * Stress test using netty with NIO and many JMS clients concurrently, to try
+ * and induce a deadlock.
+ * <p>
+ * A large number of JMS clients are started concurrently. Some produce to queue
+ * 1 over one connection, others consume from queue 1 and produce to queue 2
+ * over a second connection, and others consume from queue 2 over a third
+ * connection.
+ * <p>
+ * Each operation is done in a JMS transaction, sending/consuming one message
+ * per transaction.
+ * <p>
+ * The server is set up with netty, with only one NIO worker and 1 hornetq
+ * server worker. This increases the chance for the deadlock to occur.
+ * <p>
+ * If the deadlock occurs, all threads will block/die. A simple transaction
+ * counting strategy is used to verify that the count has reached the expected
+ * value.
+ * @author Carl Heymann
+ */
+public class JmsNettyNioStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Remove this method to re-enable those tests
+ public void testStressSendNetty() throws Exception
+ {
+ doTestStressSend(true);
+ }
+
+ public void doTestStressSend(final boolean netty) throws Exception
+ {
+ // first set up the server
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PORT_PROP_NAME, 5445);
+ params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ params.put(TransportConstants.USE_NIO_PROP_NAME, true);
+ // minimize threads to maximize possibility for deadlock
+ params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, 1);
+ params.put(TransportConstants.BATCH_DELAY, 50);
+ Configuration config = UnitTestCase.createDefaultConfig(params, ServiceTestBase.NETTY_ACCEPTOR_FACTORY);
+ HornetQServer server = createServer(true, config);
+ server.getConfiguration().setThreadPoolMaxSize(2);
+ server.start();
+
+ // now the client side
+ Map<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(TransportConstants.PORT_PROP_NAME, 5445);
+ connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true);
+ connectionParams.put(TransportConstants.BATCH_DELAY, 50);
+ connectionParams.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, 6);
+ final TransportConfiguration transpConf = new TransportConfiguration(NettyConnectorFactory.class.getName(),
+ connectionParams);
+ final ServerLocator locator = createNonHALocator(netty);
+
+ // each thread will do this number of transactions
+ final int numberOfMessages = 100;
+
+ // these must all be the same
+ final int numProducers = 30;
+ final int numConsumerProducers = 30;
+ final int numConsumers = 30;
+
+ // each produce, consume+produce and consume increments this counter
+ final AtomicInteger totalCount = new AtomicInteger(0);
+
+ // the total we expect if all producers, consumer-producers and
+ // consumers complete normally
+ int totalExpectedCount = (numProducers + numConsumerProducers + numConsumerProducers) * numberOfMessages;
+
+ // each group gets a separate connection
+ final Connection connectionProducer;
+ final Connection connectionConsumerProducer;
+ final Connection connectionConsumer;
+
+ // create the 2 queues used in the test
+ ClientSessionFactory sf = locator.createSessionFactory(transpConf);
+ ClientSession session = sf.createTransactedSession();
+ session.createQueue("jms.queue.queue", "jms.queue.queue");
+ session.createQueue("jms.queue.queue2", "jms.queue.queue2");
+ session.commit();
+ sf.close();
+ session.close();
+ locator.close();
+
+ // create and start JMS connections
+ HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transpConf);
+ connectionProducer = cf.createConnection();
+ connectionProducer.start();
+
+ connectionConsumerProducer = cf.createConnection();
+ connectionConsumerProducer.start();
+
+ connectionConsumer = cf.createConnection();
+ connectionConsumer.start();
+
+ // these threads produce messages on the the first queue
+ for (int i = 0; i < numProducers; i++)
+ {
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+
+ Session session = null;
+ try
+ {
+ session = connectionProducer.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer messageProducer = session.createProducer(HornetQDestination.createQueue("queue"));
+ messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[3000]);
+ message.setStringProperty("Service", "LoadShedService");
+ message.setStringProperty("Action", "testAction");
+
+ messageProducer.send(message);
+ session.commit();
+
+ totalCount.incrementAndGet();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }.start();
+ }
+
+ // these threads just consume from the one and produce on a second queue
+ for (int i = 0; i < numConsumerProducers; i++)
+ {
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ Session session = null;
+ try
+ {
+ session = connectionConsumerProducer.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(HornetQDestination.createQueue("queue"));
+ MessageProducer messageProducer = session.createProducer(HornetQDestination.createQueue("queue2"));
+ messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ BytesMessage message = (BytesMessage)consumer.receive(5000);
+ if (message == null)
+ {
+ return;
+ }
+ message = session.createBytesMessage();
+ message.writeBytes(new byte[3000]);
+ message.setStringProperty("Service", "LoadShedService");
+ message.setStringProperty("Action", "testAction");
+ messageProducer.send(message);
+ session.commit();
+
+ totalCount.incrementAndGet();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }.start();
+ }
+
+ // these threads consume from the second queue
+ for (int i = 0; i < numConsumers; i++)
+ {
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ Session session = null;
+ try
+ {
+ session = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(HornetQDestination.createQueue("queue2"));
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ BytesMessage message = (BytesMessage)consumer.receive(5000);
+ if (message == null)
+ {
+ return;
+ }
+ session.commit();
+
+ totalCount.incrementAndGet();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }.start();
+ }
+
+ // check that the overall transaction count reaches the expected number,
+ // which would indicate that the system didn't stall
+ int timeoutCounter = 0;
+ int maxSecondsToWait = 60;
+ while (timeoutCounter < maxSecondsToWait && totalCount.get() < totalExpectedCount)
+ {
+ timeoutCounter++;
+ Thread.sleep(1000);
+ System.out.println("Not done yet.. " + (maxSecondsToWait - timeoutCounter) + "; " + totalCount.get());
+ }
+ System.out.println("Done.." + totalCount.get() + ", expected " + totalExpectedCount);
+ Assert.assertEquals("Possible deadlock", totalExpectedCount, totalCount.get());
+ System.out.println("After assert");
+
+ // attempt cleaning up (this is not in a finally, still needs some work)
+ connectionProducer.close();
+ connectionConsumerProducer.close();
+ connectionConsumer.close();
+
+ server.stop();
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
13 years, 5 months
JBoss hornetq SVN: r11190 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-11 13:09:49 -0400 (Thu, 11 Aug 2011)
New Revision: 11190
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 (test case) Restore original handler to channel after delayed backup is up-to-date.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-11 17:09:12 UTC (rev 11189)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-11 17:09:49 UTC (rev 11190)
@@ -381,7 +381,7 @@
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull("null message", message);
+ Assert.assertNull("message should be null! Was: " + message, message);
session.close();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-08-11 17:09:12 UTC (rev 11189)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-08-11 17:09:49 UTC (rev 11190)
@@ -94,7 +94,8 @@
for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
{
- Assert.assertEquals("equal at " + j, buffer.readByte(), UnitTestCase.getSamplebyte(j));
+ Assert.assertTrue("expecting more bytes", buffer.readable());
+ Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
}
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-11 17:09:12 UTC (rev 11189)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-11 17:09:49 UTC (rev 11190)
@@ -111,6 +111,7 @@
finally
{
handler.setChannel(channel);
+ channel.setHandler(handler);
onHold = null;
}
}
13 years, 5 months
JBoss hornetq SVN: r11189 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-11 13:09:12 -0400 (Thu, 11 Aug 2011)
New Revision: 11189
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
Log:
HORNETQ-720 Add number of operations to COMMIT/PREPARE journal entries
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-11 17:08:43 UTC (rev 11188)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-11 17:09:12 UTC (rev 11189)
@@ -1,6 +1,9 @@
package org.hornetq.core.journal.impl;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.api.core.HornetQException;
@@ -29,6 +32,8 @@
private final ReentrantLock lockAppend = new ReentrantLock();
// private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+ private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
+
private final JournalFile currentFile;
/**
@@ -62,16 +67,6 @@
// ------------------------
-// private void readLockJournal()
-// {
-// journalLock.readLock().lock();
-// }
-//
-// private void readUnlockJournal()
-// {
-// journalLock.readLock().unlock();
-// }
-
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
throws Exception
@@ -84,10 +79,10 @@
/**
* Write the record to the current file.
*/
- private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
+ private void writeRecord(JournalInternalRecord encoder, final boolean sync, final IOCompletion callback)
+ throws Exception
{
-
lockAppend.lock();
try
{
@@ -130,6 +125,7 @@
public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
throws Exception
{
+ count(txID);
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
writeRecord(addRecord, false, null);
}
@@ -147,6 +143,7 @@
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
throws Exception
{
+ count(txID);
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
writeRecord(updateRecordTX, false, null);
}
@@ -156,7 +153,13 @@
throws Exception
{
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
- writeRecord(commitRecord, sync, callback);
+ AtomicInteger value = transactions.remove(Long.valueOf(txID));
+ if (value != null)
+ {
+ commitRecord.setNumberOfRecords(value.get());
+ }
+
+ writeRecord(commitRecord, true, callback);
}
@Override
@@ -164,9 +167,27 @@
throws Exception
{
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
+ AtomicInteger value = transactions.get(Long.valueOf(txID));
+ if (value != null)
+ {
+ prepareRecord.setNumberOfRecords(value.get());
+ }
writeRecord(prepareRecord, sync, callback);
}
+ private int count(long txID) throws HornetQException
+ {
+ AtomicInteger defaultValue = new AtomicInteger(1);
+ AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
+ if (count != null)
+ {
+ return count.incrementAndGet();
+ }
+ return defaultValue.get();
+ }
+
+ // UNSUPPORTED STUFF
+
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
{
13 years, 5 months
JBoss hornetq SVN: r11188 - in branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl: dataformat and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-11 13:08:43 -0400 (Thu, 11 Aug 2011)
New Revision: 11188
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
Log:
HORNETQ-720 Rewrite javadoc as old information was incorrect.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-11 17:08:09 UTC (rev 11187)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-11 17:08:43 UTC (rev 11188)
@@ -1186,19 +1186,7 @@
/**
- * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
- * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
- * <p>The element-summary will then have</p>
- * <p>FileID1, 10</p>
- * <p>FileID2, 10</p>
- * <p>FileID3, 10</p>
- *
- * <br>
- * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
- * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
- * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+ * Regarding the number of operations in a given file see {@link JournalCompleteRecordTX}.
*/
@Override
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
@@ -2596,11 +2584,13 @@
// -----------------------------------------------------------------------------
/**
- * <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
- * <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
- *
- * <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about how the transaction-summary is recorded</p>
- *
+ * <p>
+ * Checks for holes on the transaction (a commit written but with an incomplete transaction).
+ * <p>
+ * This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the
+ * COMMIT-RECORD.
+ * <p>
+ * For details see {@link JournalCompleteRecordTX} about how the transaction-summary is recorded.
* @param journalTransaction
* @param orderedFiles
* @param recordedSummary
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2011-08-11 17:08:09 UTC (rev 11187)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2011-08-11 17:08:43 UTC (rev 11188)
@@ -15,26 +15,23 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
/**
- * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
- * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
- * <p>The element-summary will then have</p>
- * <p>FileID1, 10</p>
- * <p>FileID2, 10</p>
- * <p>FileID3, 10</p>
- *
- * <br>
- * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
- * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
- * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
- *
+ * <p>
+ * A transaction record (Commit or Prepare), will hold the number of elements the transaction has in
+ * the current file.
+ * <p>
+ * While loading the {@link JournalFile}, the number of operations found is matched against this
+ * number. If for any reason there are missing operations, the transaction will be ignored.
+ * <p>
+ * We can't just use a global counter as reclaiming could delete files after the transaction was
+ * successfully committed. That also means not having a whole file on journal-reload doesn't mean we
+ * have to invalidate the transaction
+ * <p>
+ * The commit operation itself is not included in this total.
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class JournalCompleteRecordTX extends JournalInternalRecord
{
@@ -70,7 +67,7 @@
}
buffer.writeInt(fileID);
-
+
buffer.writeByte(compactCount);
buffer.writeLong(txID);
13 years, 5 months
JBoss hornetq SVN: r11187 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-11 13:08:09 -0400 (Thu, 11 Aug 2011)
New Revision: 11187
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Add test for Replication Sync of LargeMessages
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-11 16:56:03 UTC (rev 11186)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-11 17:08:09 UTC (rev 11187)
@@ -135,7 +135,7 @@
@Override
protected void crash(ClientSession... sessions) throws Exception
{
- if (backupServer != null)
+ if (backupServer != null && backupServer.isStarted())
{
// some tests crash the liveServer before the backupServer is sync'ed
waitForBackup(sf, 3);
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java 2011-08-11 17:08:09 UTC (rev 11187)
@@ -0,0 +1,26 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
+
+public class ReplicatedLargeMessageWithDelayFailoverTest extends ReplicatedLargeMessageFailoverTest
+{
+
+ private BackupSyncDelay syncDelay;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startBackupServer = false;
+ super.setUp();
+ syncDelay = new BackupSyncDelay(backupServer, liveServer);
+ backupServer.start();
+ }
+
+ @Override
+ protected void crash(ClientSession... sessions) throws Exception
+ {
+ syncDelay.deliverUpToDateMsg();
+ super.crash(sessions);
+ }
+}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-11 16:56:03 UTC (rev 11186)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-11 17:08:09 UTC (rev 11187)
@@ -41,7 +41,8 @@
public void deliverUpToDateMsg()
{
- handler.deliver();
+ if (backup.isStarted())
+ handler.deliver();
}
public BackupSyncDelay(TestableServer backup, TestableServer live)
13 years, 5 months