Author: clebert.suconic(a)jboss.com
Date: 2011-08-08 12:04:32 -0400 (Mon, 08 Aug 2011)
New Revision: 11147
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
more changes to my branch
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -33,7 +33,9 @@
private final int windowSize;
- private boolean blocked;
+ private volatile boolean closed;
+
+ private boolean blocked;
private final SimpleString address;
@@ -68,15 +70,18 @@
if (!semaphore.tryAcquire(credits))
{
- this.blocked = true;
- try
+ if (!closed)
{
- semaphore.acquire(credits);
+ this.blocked = true;
+ try
+ {
+ semaphore.acquire(credits);
+ }
+ finally
+ {
+ this.blocked = false;
+ }
}
- finally
- {
- this.blocked = false;
- }
}
}
@@ -118,6 +123,7 @@
public void close()
{
// Closing a producer that is blocking should make it return
+ closed = true;
semaphore.release(Integer.MAX_VALUE / 2);
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -133,6 +133,8 @@
private final Map<Long, ClientConsumerInternal> consumers = new
LinkedHashMap<Long, ClientConsumerInternal>();
private volatile boolean closed;
+
+ private volatile boolean closing;
private final boolean autoCommitAcks;
@@ -857,6 +859,11 @@
log.debug("Calling close on session " + this);
}
+ synchronized (this)
+ {
+ closing = true;
+ }
+
try
{
producerCreditManager.close();
@@ -905,7 +912,7 @@
{
synchronized (this)
{
- if (closed)
+ if (closed/* || closing*/)
{
return;
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -16,6 +16,7 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.impl.ConfigurationImpl;
/**
@@ -48,6 +49,8 @@
private final long maxRetryInterval;
private final int reconnectAttempts;
+
+ private final long callTimeout;
private final boolean duplicateDetection;
@@ -83,6 +86,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -101,6 +105,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -119,6 +124,7 @@
this.reconnectAttempts = reconnectAttempts;
this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
+ this.callTimeout = callTimeout;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
discoveryGroupName = null;
this.maxHops = maxHops;
@@ -146,6 +152,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -163,6 +170,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -178,6 +186,7 @@
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
+ this.callTimeout = callTimeout;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
@@ -237,6 +246,11 @@
return reconnectAttempts;
}
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
public String getConnectorName()
{
return connectorName;
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -1015,6 +1015,8 @@
"retry-interval",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
Validators.GT_ZERO);
+
+ long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout",
HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e,
"retry-interval-multiplier",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
@@ -1069,6 +1071,7 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
+ callTimeout,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1087,6 +1090,7 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
+ callTimeout,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -95,6 +95,8 @@
private final long connectionTTL;
private final long retryInterval;
+
+ private final long callTimeout;
private final double retryIntervalMultiplier;
@@ -148,6 +150,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -218,6 +221,8 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
this.manager = manager;
+
+ this.callTimeout = callTimeout;
this.clusterManagerTopology = clusterManagerTopology;
@@ -247,6 +252,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final long callTimeout,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -287,6 +293,8 @@
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
+
+ this.callTimeout = callTimeout;
this.useDuplicateDetection = useDuplicateDetection;
@@ -456,37 +464,26 @@
if (serverLocator != null)
{
+
+ if (!useDuplicateDetection)
+ {
+ log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
+ }
+
serverLocator.setNodeID(nodeUUID.toString());
serverLocator.setIdentity("(main-ClusterConnection::" +
server.toString() + ")");
-
serverLocator.setReconnectAttempts(0);
-
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
-
serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
serverLocator.setConnectionTTL(connectionTTL);
-
- if (confirmationWindowSize < 0)
- {
- // We can't have confirmationSize = -1 on the cluster Bridge
- // Otherwise we won't have confirmation working
- serverLocator.setConfirmationWindowSize(0);
- }
- else
- {
- serverLocator.setConfirmationWindowSize(confirmationWindowSize);
- }
-
- if (!useDuplicateDetection)
- {
- log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
- }
+ serverLocator.setConfirmationWindowSize(confirmationWindowSize);
// if not using duplicate detection, we will send blocked
serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+ serverLocator.setCallTimeout(callTimeout);
if (retryInterval > 0)
{
@@ -702,7 +699,7 @@
targetLocator.setConnectionTTL(connectionTTL);
targetLocator.setInitialConnectAttempts(0);
-
targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
+ targetLocator.setConfirmationWindowSize(confirmationWindowSize);
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -808,6 +808,7 @@
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
+ config.getCallTimeout(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
@@ -830,7 +831,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX " + this + " defining cluster connection
towards " + Arrays.toString(tcConfigs));
+ log.debug(this + " defining cluster connection towards " +
Arrays.toString(tcConfigs));
}
clusterConnection = new ClusterConnectionImpl(this,
@@ -845,6 +846,7 @@
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
+ config.getCallTimeout(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -1883,6 +1883,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
reconnectAttempts,
+ 1000,
true,
forwardWhenNoConsumers,
maxHops,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -14,6 +14,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -29,7 +30,6 @@
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
@@ -58,6 +58,8 @@
private JMSServerManagerImpl serverManager;
private InVMContext initialContext;
+
+ private AtomicInteger countErrors = new AtomicInteger();
private final String topicName = "my-topic";
@@ -162,11 +164,20 @@
"/cf");
}
- public void testFoo()
+ public void testFlood() throws Throwable
{
+ try
+ {
+ internalTestFlood();
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ throw e;
+ }
}
-
- public void _testFlood() throws Exception
+
+ public void internalTestFlood() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/cf");
@@ -174,26 +185,30 @@
final int numConsumers = 20;
- final int numMessages = 10000;
+ final int numMessages = 1000;
ProducerThread[] producers = new ProducerThread[numProducers];
+
+ Connection conn = cf.createConnection();
for (int i = 0; i < numProducers; i++)
{
- producers[i] = new ProducerThread(cf, numMessages);
+ producers[i] = new ProducerThread(conn, numMessages);
}
ConsumerThread[] consumers = new ConsumerThread[numConsumers];
for (int i = 0; i < numConsumers; i++)
{
- consumers[i] = new ConsumerThread(cf, numMessages);
+ consumers[i] = new ConsumerThread(conn, numMessages);
}
for (int i = 0; i < numConsumers; i++)
{
consumers[i].start();
}
+
+ conn.start();
for (int i = 0; i < numProducers; i++)
{
@@ -209,6 +224,10 @@
{
producers[i].join();
}
+
+ conn.close();
+
+ assertEquals(0, countErrors.get());
}
@@ -216,23 +235,16 @@
{
private final Connection connection;
- private final Session session;
+ private Session session;
- private final MessageProducer producer;
+ private MessageProducer producer;
private final int numMessages;
- ProducerThread(final ConnectionFactory cf, final int numMessages) throws Exception
+ ProducerThread(final Connection connection, final int numMessages) throws
Exception
{
- connection = cf.createConnection();
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- producer =
session.createProducer(HornetQJMSClient.createTopic("my-topic"));
-
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
this.numMessages = numMessages;
+ this.connection = connection;
}
@Override
@@ -240,6 +252,12 @@
{
try
{
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ producer =
session.createProducer(HornetQJMSClient.createTopic("my-topic"));
+
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
byte[] bytes = new byte[1000];
BytesMessage message = session.createBytesMessage();
@@ -255,12 +273,13 @@
// log.info("Producer " + this + " sent " + i);
// }
}
-
- connection.close();
+
+ session.close();
}
- catch (Exception e)
+ catch (Throwable e)
{
- e.printStackTrace();
+ e.printStackTrace(System.out);
+ countErrors.incrementAndGet();
}
}
}
@@ -269,22 +288,16 @@
{
private final Connection connection;
- private final Session session;
+ private Session session;
- private final MessageConsumer consumer;
+ private MessageConsumer consumer;
private final int numMessages;
- ConsumerThread(final ConnectionFactory cf, final int numMessages) throws Exception
+ ConsumerThread(final Connection conn, final int numMessages) throws Exception
{
- connection = cf.createConnection();
+ this.connection = conn;
- connection.start();
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- consumer =
session.createConsumer(HornetQJMSClient.createTopic("my-topic"));
-
this.numMessages = numMessages;
}
@@ -293,13 +306,18 @@
{
try
{
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ consumer =
session.createConsumer(HornetQJMSClient.createTopic("my-topic"));
+
for (int i = 0; i < numMessages; i++)
{
- Message msg = consumer.receive();
+ Message msg = consumer.receive(5000);
if (msg == null)
{
FloodServerTest.log.error("message is null");
+ countErrors.incrementAndGet();
break;
}
@@ -309,11 +327,12 @@
// }
}
- connection.close();
+ session.close();
}
- catch (Exception e)
+ catch (Throwable e)
{
- e.printStackTrace();
+ e.printStackTrace(System.out);
+ countErrors.incrementAndGet();
}
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-08
15:45:41 UTC (rev 11146)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-08
16:04:32 UTC (rev 11147)
@@ -954,7 +954,7 @@
if (failed)
{
- logAndSystemOut("Thread leaged on test " + this.getClass().getName() +
"::" +
+ logAndSystemOut("Thread leaked on test " + this.getClass().getName() +
"::" +
this.getName() + "\n" + buffer.toString());
fail("Thread leakage");
}
@@ -1014,6 +1014,19 @@
fail("invm registry still had acceptors registered");
}
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (AsynchronousFileImpl.getTotalMaxIO() != 0 &&
System.currentTimeMillis() > timeout)
+ {
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
if (AsynchronousFileImpl.getTotalMaxIO() != 0)
{
AsynchronousFileImpl.resetMaxAIO();