JBoss hornetq SVN: r11320 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-11 23:34:46 -0400 (Sun, 11 Sep 2011)
New Revision: 11320
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Removing check
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-12 03:28:05 UTC (rev 11319)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-12 03:34:46 UTC (rev 11320)
@@ -93,9 +93,9 @@
}
locators.clear();
super.tearDown();
- checkFreePort(5445);
- checkFreePort(5446);
- checkFreePort(5447);
+// checkFreePort(5445);
+// checkFreePort(5446);
+// checkFreePort(5447);
if (InVMRegistry.instance.size() > 0)
{
fail("InVMREgistry size > 0");
13 years, 3 months
JBoss hornetq SVN: r11319 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-11 23:28:05 -0400 (Sun, 11 Sep 2011)
New Revision: 11319
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-12 02:25:45 UTC (rev 11318)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-12 03:28:05 UTC (rev 11319)
@@ -577,6 +577,11 @@
try
{
+ if (producer != null)
+ {
+ producer.close();
+ }
+
csf.cleanup();
}
catch (Throwable dontCare)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-12 02:25:45 UTC (rev 11318)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-12 03:28:05 UTC (rev 11319)
@@ -415,10 +415,13 @@
{
public void run()
{
- if (serverLocator != null)
+ synchronized (ClusterConnectionImpl.this)
{
- serverLocator.close();
- serverLocator = null;
+ if (serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
}
}
@@ -707,6 +710,23 @@
final boolean start) throws Exception
{
final ServerLocatorInternal targetLocator = new ServerLocatorImpl(clusterManagerTopology, false, connector);
+
+ String nodeId;
+
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (serverLocator == null)
+ {
+ return;
+ }
+
+ nodeId = serverLocator.getNodeID();
+ }
targetLocator.setReconnectAttempts(0);
@@ -725,7 +745,7 @@
targetLocator.setAfterConnectionInternalListener(this);
- targetLocator.setNodeID(serverLocator.getNodeID());
+ targetLocator.setNodeID(nodeId);
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-12 02:25:45 UTC (rev 11318)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-12 03:28:05 UTC (rev 11319)
@@ -38,7 +38,6 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
@@ -96,8 +95,6 @@
TransportConstants.DEFAULT_PORT + 8,
TransportConstants.DEFAULT_PORT + 9, };
- private static final long WAIT_TIMEOUT = 10000;
-
protected int getLargeMessageSize()
{
return 500;
@@ -272,42 +269,6 @@
throw new IllegalStateException(msg);
}
- protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
- {
- waitForTopology(server, nodes, WAIT_TIMEOUT);
- }
-
- protected void waitForTopology(final HornetQServer server, final int nodes, final long timeout) throws Exception
- {
- log.debug("waiting for " + nodes + " on the topology for server = " + server);
-
- long start = System.currentTimeMillis();
-
- Topology topology = server.getClusterManager().getTopology();
-
- do
- {
- if (nodes == topology.getMembers().size())
- {
- return;
- }
-
- Thread.sleep(10);
- }
- while (System.currentTimeMillis() - start < timeout);
-
- String msg = "Timed out waiting for cluster topology of " + nodes +
- " (received " +
- topology.getMembers().size() +
- ") topology = " +
- topology +
- ")";
-
- log.error(msg);
-
- throw new Exception(msg);
- }
-
protected void waitForBindings(final int node,
final String address,
final int expectedBindingCount,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java 2011-09-12 02:25:45 UTC (rev 11318)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java 2011-09-12 03:28:05 UTC (rev 11319)
@@ -83,13 +83,13 @@
// topic1 and 2 should be the same.
// Using a different instance here just to make sure it is implemented correctly
MessageConsumer cons2 = session2.createDurableSubscriber(topic2, "sub2");
- Thread.sleep(2000);
+ Thread.sleep(500);
MessageProducer prod1 = session1.createProducer(topic1);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 2; i++)
{
prod1.send(session1.createTextMessage("someMessage"));
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-12 02:25:45 UTC (rev 11318)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-12 03:28:05 UTC (rev 11319)
@@ -118,6 +118,10 @@
jmsServer2.start();
jmsServer2.activated();
waitForServer(jmsServer2.getHornetQServer());
+
+ waitForTopology(jmsServer1.getHornetQServer(), 2);
+
+ waitForTopology(jmsServer2.getHornetQServer(), 2);
cf1 = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(),
generateInVMParams(0)));
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-12 02:25:45 UTC (rev 11318)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-12 03:28:05 UTC (rev 11319)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -60,6 +61,9 @@
{
// Constants -----------------------------------------------------
+
+ protected static final long WAIT_TIMEOUT = 10000;
+
// Attributes ----------------------------------------------------
@@ -98,6 +102,43 @@
}
}
+ protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
+ {
+ waitForTopology(server, nodes, WAIT_TIMEOUT);
+ }
+
+ protected void waitForTopology(final HornetQServer server, final int nodes, final long timeout) throws Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server = " + server);
+
+ long start = System.currentTimeMillis();
+
+ Topology topology = server.getClusterManager().getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < timeout);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes +
+ " (received " +
+ topology.getMembers().size() +
+ ") topology = " +
+ topology +
+ ")";
+
+ log.error(msg);
+
+ throw new Exception(msg);
+ }
+
+
protected static Map<String, Object> generateParams(final int node, final boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();
13 years, 3 months
JBoss hornetq SVN: r11318 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-11 22:25:45 -0400 (Sun, 11 Sep 2011)
New Revision: 11318
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
Trying to fix a test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-12 02:25:45 UTC (rev 11318)
@@ -863,10 +863,6 @@
{
log.debug("stopping bridge " + BridgeImpl.this);
- queue.removeConsumer(BridgeImpl.this);
-
- internalCancelReferences();
-
if (session != null)
{
log.debug("Cleaning up session " + session);
@@ -886,6 +882,10 @@
csf.cleanup();
}
+ queue.removeConsumer(BridgeImpl.this);
+
+ internalCancelReferences();
+
synchronized (BridgeImpl.this)
{
log.debug("Closing Session for bridge " + BridgeImpl.this.name);
13 years, 3 months
JBoss hornetq SVN: r11317 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 6 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-11 21:47:54 -0400 (Sun, 11 Sep 2011)
New Revision: 11317
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
fixed some stomp tests, with some dirty debug logs, remove later.
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -238,12 +238,15 @@
{
if (binding.getFilter() == null || binding.getFilter().match(message))
{
+ log.error("---------------------- route to binding: " + binding);
binding.getBindable().route(message, context);
routed = true;
}
}
}
+
+ log.error("-------- now routed is: " + routed);
if (!routed)
{
@@ -276,6 +279,7 @@
if (theBinding != null)
{
+ log.error("------------------- route theBinding: " + theBinding + " mesage: " + message);
theBinding.route(message, context);
}
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -591,8 +591,11 @@
cleanupInternalPropertiesBeforeRouting(message);
}
+ log.error("----------get address: " + address + " addressManager: " + addressManager);
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+
+ log.error("-------------------Bindings: " + bindings);
if (bindings != null)
{
@@ -631,6 +634,7 @@
}
else
{
+ log.error("----------processing route: " + context + " direct " + direct);
processRoute(message, context, direct);
}
@@ -967,6 +971,8 @@
message.incrementRefCount();
}
}
+
+ log.error("In processing, tx: " + tx);
if (tx != null)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -220,6 +220,7 @@
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
+ log.error("------------------------ write buffer " + connection);
connection.getTransportConnection().write(buffer, flush, batch);
}
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -73,6 +73,8 @@
{
Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+ log.error("------------------channel sent " + channel);
+
channel.sendBatched(packet);
int size = packet.getPacketSize();
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -55,6 +55,7 @@
private String clientID;
+ //this means login is valid. (stomp connection ok)
private boolean valid;
private boolean destroyed = false;
@@ -75,6 +76,7 @@
private VersionedStompFrameHandler frameHandler;
+ //this means the version negotiation done.
private boolean initialized;
private FrameEventListener stompListener;
@@ -459,6 +461,7 @@
public void sendFrame(StompFrame frame)
{
+ log.error("--------------- sending reply: " + frame);
manager.sendReply(this, frame);
}
@@ -518,7 +521,9 @@
}
try
{
+ log.error("--------------------- sending mesage: " + message);
stompSession.getSession().send(message, true);
+ log.error("----------------------sent by " + stompSession.getSession());
}
catch (Exception e)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -565,6 +565,8 @@
data = data - pos;
// reset
+
+ log.error("-------new Frame decoded: " + command + " headers " + headers + " content " + content);
StompFrame ret = new StompFrame(command, headers, content);
@@ -588,6 +590,8 @@
pos = 0;
command = null;
+
+ headers = new HashMap<String, String>();
this.headerBytesCopyStart = -1;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -94,7 +94,7 @@
@Override
public String toString()
{
- return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=";
+ return "StompFrame[command=" + command + ", headers=" + headers + ", content= " + this.body + " bytes " + this.bytesBody;
}
public String asString()
@@ -113,7 +113,14 @@
{
if (buffer == null)
{
- buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ if (bytesBody != null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ }
+ else
+ {
+ buffer = HornetQBuffers.dynamicBuffer(512);
+ }
StringBuffer head = new StringBuffer();
head.append(command);
@@ -130,7 +137,10 @@
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
- buffer.writeBytes(bytesBody);
+ if (bytesBody != null)
+ {
+ buffer.writeBytes(bytesBody);
+ }
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
@@ -195,8 +205,15 @@
return headers.containsKey(key);
}
- public String getBody()
+ public String getBody() throws UnsupportedEncodingException
{
+ if (body == null)
+ {
+ if (bytesBody != null)
+ {
+ body = new String(bytesBody, "UTF-8");
+ }
+ }
return body;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -149,7 +149,7 @@
}
synchronized (connection)
{
- if (connection.isDestroyed() || !connection.isValid())
+ if (connection.isDestroyed())
{
log.warn("Connection closed " + connection);
return;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
@@ -39,18 +40,20 @@
*
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*/
-public class StompFrameHandlerV10 extends VersionedStompFrameHandler
+public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV10.class);
public StompFrameHandlerV10(StompConnection connection)
{
this.connection = connection;
+ connection.addStompEventListener(this);
}
@Override
public StompFrame onConnect(StompFrame frame)
{
+ log.error("-----------------onConnection ()");
StompFrame response = null;
Map<String, String> headers = frame.getHeadersMap();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -58,12 +61,14 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+ log.error("------------ validating user: " + login + " code " + passcode);
if (connection.validateUser(login, passcode))
{
+ log.error("-------user OK!!!");
connection.setClientID(clientID);
connection.setValid(true);
- response = new StompFrame(Stomp.Responses.CONNECTED);
+ response = new StompFrameV10(Stomp.Responses.CONNECTED);
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
@@ -74,6 +79,7 @@
}
else
{
+ log.error("--------user NOT ok!!");
//not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
@@ -86,11 +92,6 @@
log.error("Encoding problem", e);
//then we will send a null body message.
}
-
- connection.sendFrame(response);
- connection.destroy();
-
- return null;
}
return response;
}
@@ -105,10 +106,12 @@
@Override
public StompFrame onSend(StompFrame frame)
{
+ log.error("-------------on Send: " + frame);
StompFrame response = null;
try
{
connection.validate();
+ log.error("-----------connection is valid");
String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
@@ -120,11 +123,13 @@
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
{
+ log.error("--------------------------------it's a bryte type");
message.setType(Message.BYTES_TYPE);
message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
}
else
{
+ log.error("------------------ it's a text type");
message.setType(Message.TEXT_TYPE);
String text = frame.getBody();
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
@@ -367,4 +372,21 @@
return decoder.defaultDecode(buffer);
}
+ @Override
+ public void replySent(StompFrame reply)
+ {
+ log.error("-----------------------need destroy? " + reply.needsDisconnect());
+ if (reply.needsDisconnect())
+ {
+ connection.destroy();
+ }
+ }
+
+ @Override
+ public void requestAccepted(StompFrame request)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -93,4 +93,19 @@
return server;
}
+ public static HornetQServer newHornetQServer(Configuration config,
+ String defUser, String defPass)
+ {
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+ securityManager.addUser(defUser, defPass);
+
+ HornetQServer server = HornetQServers.newHornetQServer(config,
+ ManagementFactory.getPlatformMBeanServer(),
+ securityManager,
+ config.isPersistenceEnabled());
+
+ return server;
+ }
+
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -279,6 +279,7 @@
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
+ log.error("-------------------in queue route, context: " + context);
context.addQueue(address, this);
}
@@ -362,6 +363,7 @@
return;
}
+ log.error("----------------checkingDirect " + checkDirect);
// The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
@@ -384,10 +386,13 @@
checkDirect = false;
}
+ log.error("-----now direct " + direct + " directDeliver " + directDeliver );
if (direct && directDeliver && deliverDirect(ref))
{
return;
}
+
+ log.error("------- ok, adding ref to the queue");
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -396,6 +401,8 @@
directDeliver = false;
executor.execute(concurrentPoller);
+
+ log.error("-----------executing : " + concurrentPoller);
}
public void deliverAsync()
@@ -1946,7 +1953,10 @@
HandleStatus status;
try
{
+ log.error("-------------------Now let consumer " + consumer + " handle " + reference);
status = consumer.handle(reference);
+
+ log.error("-------------- returned status: " + status);
}
catch (Throwable t)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -286,6 +286,7 @@
}
else
{
+ log.error("--------------------- deliver standard");
deliverStandardMessage(ref, message);
}
@@ -695,6 +696,7 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
{
+ log.error("------------------ calling callback " + callback + " to send message");
int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
if (availableCredits != null)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -119,6 +119,7 @@
public void addUser(final String user, final String password)
{
+ log.error("-------------------------------adding user: " + user + " password " + password);
if (user == null)
{
throw new IllegalArgumentException("User cannot be null");
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -39,6 +39,8 @@
frame = receiveFrame(10000);
//We send and consumer a message to ensure a STOMP connection and server session is created
+
+ System.out.println("Received frame: " + frame);
Assert.assertTrue(frame.startsWith("CONNECTED"));
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -370,12 +370,13 @@
frame = receiveFrame(100000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
sendFrame(frame);
sendMessage(getName());
frame = receiveFrame(10000);
+ System.out.println("-------- frame received: " + frame);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-12 01:47:54 UTC (rev 11317)
@@ -39,7 +39,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -80,7 +79,11 @@
protected JMSServerManager server;
+ protected String defUser = "brianm";
+ protected String defPass = "wombats";
+
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -118,7 +121,7 @@
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config, defUser, defPass);
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations()
13 years, 3 months
JBoss hornetq SVN: r11316 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 22:13:50 -0400 (Fri, 09 Sep 2011)
New Revision: 11316
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-765 - fixing a failing test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-09 21:41:06 UTC (rev 11315)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-10 02:13:50 UTC (rev 11316)
@@ -86,7 +86,15 @@
final PageSubscription subscription)
{
this.position = position;
- this.messageEstimate = message.getMessage().getMemoryEstimate();
+
+ if (message == null)
+ {
+ this.messageEstimate = -1;
+ }
+ else
+ {
+ this.messageEstimate = message.getMessage().getMemoryEstimate();
+ }
this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -112,6 +120,10 @@
*/
public int getMessageMemoryEstimate()
{
+ if (messageEstimate < 0)
+ {
+ messageEstimate = getMessage().getMemoryEstimate();
+ }
return messageEstimate;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-09-09 21:41:06 UTC (rev 11315)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-09-10 02:13:50 UTC (rev 11316)
@@ -263,6 +263,8 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
+
+ waitForServer(server);
queue = server.locateQueue(ADDRESS);
13 years, 3 months
JBoss hornetq SVN: r11315 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 17:41:06 -0400 (Fri, 09 Sep 2011)
New Revision: 11315
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
HORNETQ-753 and JBPAPP-7115
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,28 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ : (LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +323,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
: (LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +340,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
newMessage.linkMessage = null;
@@ -341,9 +364,9 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
+ return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
- ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -61,6 +61,7 @@
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
+ this.message = message;
}
/**
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -455,7 +455,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -467,10 +467,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
@@ -535,6 +545,12 @@
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
// with any messages resent
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("XXX going to send message " + message);
+ }
+
try
{
producer.send(dest, message);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -169,34 +169,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
@@ -275,7 +285,7 @@
{
return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
",expiration=" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : 0) +
- ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
// FIXME - this is stuff that is only used in large messages
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -1073,6 +1073,11 @@
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
+ if (log.isTraceEnabled())
+ {
+ log.trace("sendLarge::" + largeMsg);
+ }
+
if (currentLargeMessage != null)
{
ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -74,7 +74,17 @@
public abstract class ClusterTestBase extends ServiceTestBase
{
private final Logger log = Logger.getLogger(this.getClass());
+ public ClusterTestBase()
+ {
+ super();
+ }
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -87,7 +97,18 @@
TransportConstants.DEFAULT_PORT + 9, };
private static final long WAIT_TIMEOUT = 10000;
+
+ protected int getLargeMessageSize()
+ {
+ return 500;
+ }
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
+
private static final long TIMEOUT_START_SERVER = 10;
@Override
@@ -635,6 +656,11 @@
}
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
producer.send(message);
@@ -686,9 +712,15 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
producer.send(message);
}
}
@@ -881,6 +913,12 @@
log.info("msg on ClusterTestBase = " + message);
+
+ if (isLargeMessage())
+ {
+ checkMessageBody(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -1180,7 +1218,10 @@
if (message != null)
{
int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+
+ checkMessageBody(message);
+
// log.info("consumer " + consumerIDs[i] + " received message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1241,6 +1282,20 @@
}
+ /**
+ * @param message
+ */
+ private void checkMessageBody(ClientMessage message)
+ {
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+ }
+
protected void verifyReceiveRoundRobinInSomeOrderNoAck(final int numMessages, final int... consumerIDs) throws Exception
{
if (numMessages < consumerIDs.length)
@@ -1274,6 +1329,12 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ checkMessageBody(message);
+ }
+
if (ack)
{
message.acknowledge();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-09 20:31:28 UTC (rev 11314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-09 21:41:06 UTC (rev 11315)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -113,101 +123,11 @@
removeConsumer(1);
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+ verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
MessageRedistributionTest.log.info("Test done");
}
- // https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop the servers
- Thread.sleep(1000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
{
setupCluster(false);
13 years, 3 months
JBoss hornetq SVN: r11314 - in branches/Branch_2_2_EAP: src/main/org/hornetq/api/core/client and 22 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 16:31:28 -0400 (Fri, 09 Sep 2011)
New Revision: 11314
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
Modified:
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixes on Clustering
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-09-09 20:31:28 UTC (rev 11314)
@@ -2,8 +2,8 @@
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=8
-hornetq.version.incrementingVersion=121
+hornetq.version.incrementingVersion=122
hornetq.version.versionSuffix=CR1
hornetq.version.versionTag=CR1
hornetq.netty.version=(a)NETTY.VERSION@
-hornetq.version.compatibleVersionList=121
+hornetq.version.compatibleVersionList=121,122
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
- void nodeDown(String nodeID);
+ void nodeDown(long eventUID, String nodeID);
}
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * To be called right after the ConnectionFactory created a connection.
+ * This listener is not part of the API and shouldn't be used by users.
+ * (if you do so we can't guarantee any API compatibility on this class)
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface AfterConnectInternalListener
+{
+ void onConnection(ClientSessionFactoryInternal sf);
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -43,12 +43,13 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
-import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -1291,21 +1292,16 @@
ClientSessionFactoryImpl.log.trace(this + "::Subscribing Topology");
}
- channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
- if (serverLocator.isClusterConnection())
- {
- TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
- if (ClientSessionFactoryImpl.isDebug)
- {
- ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
- ", isBackup=" +
- serverLocator.isBackup());
- }
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
- }
+ channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(), VersionLoader.getVersion().getIncrementingVersion()));
+
}
}
+ if (serverLocator.getAfterConnectInternalListener() != null)
+ {
+ serverLocator.getAfterConnectInternalListener().onConnection(this);
+ }
+
if (ClientSessionFactoryImpl.log.isTraceEnabled())
{
ClientSessionFactoryImpl.log.trace("returning " + connection);
@@ -1314,6 +1310,20 @@
return connection;
}
+ /**
+ * @param channel0
+ */
+ public void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig)
+ {
+ Channel channel0 = connection.getChannel(0, -1);
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
+ ", isBackup=" + isBackup);
+ }
+ channel0.send(new NodeAnnounceMessage(currentEventID, nodeID, isBackup, config, backupConfig));
+ }
+
@Override
public void finalize() throws Throwable
{
@@ -1439,7 +1449,7 @@
if (nodeID != null)
{
- serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ serverLocator.notifyNodeDown(System.currentTimeMillis(), msg.getNodeID().toString());
}
closeExecutor.execute(new Runnable()
@@ -1464,7 +1474,7 @@
{
ClientSessionFactoryImpl.log.debug("Notifying " + topMessage.getNodeID() + " going down");
}
- serverLocator.notifyNodeDown(topMessage.getNodeID());
+ serverLocator.notifyNodeDown(System.currentTimeMillis(), topMessage.getNodeID());
}
else
{
@@ -1478,9 +1488,36 @@
" csf created at\nserverLocator=" +
serverLocator, e);
}
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUp(System.currentTimeMillis(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
+ {
+ ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2)packet;
+
+ if (topMessage.isExit())
+ {
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Notifying " + topMessage.getNodeID() + " going down");
+ }
+ serverLocator.notifyNodeDown(topMessage.getUniqueEventID(), topMessage.getNodeID());
+ }
+ else
+ {
+ if (ClientSessionFactoryImpl.isDebug)
+ {
+ ClientSessionFactoryImpl.log.debug("Node " + topMessage.getNodeID() +
+ " going up, connector = " +
+ topMessage.getPair() +
+ ", isLast=" +
+ topMessage.isLast() +
+ " csf created at\nserverLocator=" +
+ serverLocator, e);
+ }
+ serverLocator.notifyNodeUp(topMessage.getUniqueEventID(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ }
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -42,6 +42,8 @@
void removeSession(final ClientSessionInternal session, boolean failingOver);
void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
+
+ void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig);
TransportConfiguration getConnectorConfiguration();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -165,6 +165,8 @@
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private AfterConnectInternalListener afterConnectListener;
private String groupID;
@@ -483,7 +485,9 @@
* @param discoveryAddress
* @param discoveryPort
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ public ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
{
this(topology, useHA, groupConfiguration, null);
@@ -545,6 +549,11 @@
}
});
}
+
+ public Executor getExecutor()
+ {
+ return startExecutor;
+ }
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
@@ -554,7 +563,7 @@
finalizeCheck = false;
}
- public ClientSessionFactory connect() throws Exception
+ public ClientSessionFactoryInternal connect() throws Exception
{
ClientSessionFactoryInternal sf;
// static list of initial connectors
@@ -571,6 +580,19 @@
return sf;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+ */
+ public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+ {
+ this.afterConnectListener = listener;
+ }
+
+ public AfterConnectInternalListener getAfterConnectInternalListener()
+ {
+ return afterConnectListener;
+ }
+
public boolean isClosed()
{
return closed || closing;
@@ -1244,7 +1266,10 @@
closed = true;
}
- public void notifyNodeDown(final String nodeID)
+ /** This is directly called when the connection to the node is gone,
+ * or when the node sends a disconnection.
+ * Look for callers of this method! */
+ public void notifyNodeDown(final long eventTime, final String nodeID)
{
if (topology == null)
@@ -1258,27 +1283,31 @@
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
- topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
+ if (topology.removeMember(eventTime, nodeID))
{
- updateArraysAndPairs();
+ if (topology.isEmpty())
+ {
+ // Resetting the topology to its original condition as it was brand new
+ topologyArray = null;
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
- {
receivedTopology = false;
}
- }
- else
- {
- topologyArray = null;
+ else
+ {
+ updateArraysAndPairs();
- receivedTopology = false;
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ {
+ // Resetting the topology to its original condition as it was brand new
+ receivedTopology = false;
+ }
+ }
}
}
- public void notifyNodeUp(final String nodeID,
+ public void notifyNodeUp(long uniqueEventID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1293,33 +1322,33 @@
log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
}
- topology.addMember(nodeID, new TopologyMember(connectorPair), last);
+ TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
- TopologyMember actMember = topology.getMember(nodeID);
+ if (topology.updateMember(uniqueEventID, nodeID, member))
+ {
- if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
- {
- for (ClientSessionFactory factory : factories)
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
{
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
- actMember.getConnector().b);
+ for (ClientSessionFactory factory : factories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+ actMember.getConnector().b);
+ }
}
- }
- if (connectorPair.a != null)
- {
updateArraysAndPairs();
}
- synchronized (this)
+ if (last)
{
- if (last)
+ synchronized (this)
{
receivedTopology = true;
+ // Notify if waiting on getting topology
+ notifyAll();
}
-
- // Notify if waiting on getting topology
- notifyAll();
}
}
@@ -1569,9 +1598,9 @@
threadPool,
scheduledThreadPool,
interceptors);
-
+
factory.disableFinalizeCheck();
-
+
connectors.add(new Connector(initialConnector, factory));
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -31,8 +31,14 @@
{
void start(Executor executor) throws Exception;
+ Executor getExecutor();
+
void factoryClosed(final ClientSessionFactory factory);
+ AfterConnectInternalListener getAfterConnectInternalListener();
+
+ void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+
/** Used to better identify Cluster Connection Locators on logs while debugging logs */
void setIdentity(String identity);
@@ -42,11 +48,16 @@
void cleanup();
- ClientSessionFactory connect() throws Exception;
+ ClientSessionFactoryInternal connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void notifyNodeUp(long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
- void notifyNodeDown(String nodeID);
+ /**
+ *
+ * @param uniqueEventID 0 means get the previous ID +1
+ * @param nodeID
+ */
+ void notifyNodeDown(long uniqueEventID, String nodeID);
void setClusterConnection(boolean clusterConnection);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -36,16 +36,12 @@
public class Topology implements Serializable
{
- private static final int BACKOF_TIMEOUT = 500;
-
private static final long serialVersionUID = -9037171688692471371L;
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
- private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
-
private Executor executor = null;
/** Used to debug operations.
@@ -65,6 +61,8 @@
*/
private final Map<String, TopologyMember> mapTopology = new ConcurrentHashMap<String, TopologyMember>();
+ private final Map<String, Long> mapDelete = new ConcurrentHashMap<String, Long>();
+
public Topology(final Object owner)
{
this.owner = owner;
@@ -104,93 +102,142 @@
}
}
- public boolean addMember(final String nodeId, final TopologyMember memberInput, final boolean last)
+ /** This is called by the server when the node is activated from backup state. It will always succeed */
+ public void updateAsLive(final String nodeId, final TopologyMember memberInput)
{
synchronized (this)
{
- TopologyMember currentMember = mapTopology.get(nodeId);
+ if (log.isDebugEnabled())
+ {
+ log.info(this + "::Live node " + nodeId + "=" + memberInput);
+ }
+ memberInput.setUniqueEventID(System.currentTimeMillis());
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, memberInput);
+ sendMemberUp(memberInput.getUniqueEventID(), nodeId, memberInput);
+ }
+ }
+ /** This is called by the server when the node is activated from backup state. It will always succeed */
+ public TopologyMember updateBackup(final String nodeId, final TopologyMember memberInput)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
+ }
+
+ synchronized (this)
+ {
+ TopologyMember currentMember = getMember(nodeId);
if (currentMember == null)
{
- if (!testBackof(nodeId))
- {
- return false;
- }
+ log.warn("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput,
+ new Exception("trace"));
+ currentMember = memberInput;
+ mapTopology.put(nodeId, currentMember);
+ }
+
+ TopologyMember newMember = new TopologyMember(currentMember.getA(), memberInput.getB());
+ newMember.setUniqueEventID(System.currentTimeMillis());
+ mapTopology.remove(nodeId);
+ mapTopology.put(nodeId, newMember);
+ sendMemberUp(newMember.getUniqueEventID(), nodeId, newMember);
+
+ return newMember;
+ }
+ }
+
+ /**
+ *
+ * @param <p>uniqueIdentifier an unique identifier for when the change was made
+ * We will use current time millis for starts, and a ++ of that number for shutdown. </p>
+ * @param nodeId
+ * @param memberInput
+ * @return
+ */
+ public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
+ {
+
+ Long deleteTme = mapDelete.get(nodeId);
+ if (deleteTme != null && uniqueEventID < deleteTme)
+ {
+ log.debug("Update uniqueEvent=" + uniqueEventID +
+ ", nodeId=" +
+ nodeId +
+ ", memberInput=" +
+ memberInput +
+ " being rejected as there was a delete done after that");
+ return false;
+ }
+
+ synchronized (this)
+ {
+ TopologyMember currentMember = mapTopology.get(nodeId);
+
+ if (currentMember == null)
+ {
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this + "::NewMemeberAdd " + this +
- " MEMBER WAS NULL, Add member nodeId=" +
+ Topology.log.debug(this + "::NewMemeberAdd nodeId=" +
nodeId +
" member = " +
- memberInput +
- " size = " +
- mapTopology.size(), new Exception("trace"));
+ memberInput, new Exception("trace"));
}
+ memberInput.setUniqueEventID(uniqueEventID);
mapTopology.put(nodeId, memberInput);
- sendMemberUp(nodeId, memberInput);
+ sendMemberUp(uniqueEventID, nodeId, memberInput);
return true;
}
else
{
- if (log.isTraceEnabled())
+ if (uniqueEventID > currentMember.getUniqueEventID())
{
- log.trace(this + ":: validating update for currentMember=" + currentMember + " of memberInput=" + memberInput);
- }
+ TopologyMember newMember = new TopologyMember(memberInput.getA(), memberInput.getB());
- boolean replaced = false;
- TopologyMember memberToSend = currentMember;
+ if (newMember.getA() == null && currentMember.getA() != null)
+ {
+ newMember.setA(currentMember.getA());
+ }
- if (hasChanged("a", memberToSend.getConnector().a, memberInput.getConnector().a))
- {
- if (!replaced && !testBackof(nodeId))
+ if (newMember.getB() == null && currentMember.getB() != null)
{
- return false;
+ newMember.setB(currentMember.getB());
}
- memberToSend = new TopologyMember(memberInput.getConnector().a, memberToSend.getConnector().b);
- replaced = true;
- }
- if (hasChanged("b", memberToSend.getConnector().b, memberInput.getConnector().b))
- {
- if (!replaced && !testBackof(nodeId))
+ if (log.isDebugEnabled())
{
- return false;
+ log.debug(this + "::updated currentMember=nodeID=" +
+ nodeId +
+ ", currentMember=" +
+ currentMember +
+ ", memberInput=" +
+ memberInput +
+ "newMember=" + newMember);
}
- memberToSend = new TopologyMember(memberToSend.getConnector().a, memberInput.getConnector().b);
- replaced = true;
- }
- if (replaced)
- {
+
+ newMember.setUniqueEventID(uniqueEventID);
mapTopology.remove(nodeId);
- mapTopology.put(nodeId, memberToSend);
+ mapTopology.put(nodeId, newMember);
+ sendMemberUp(uniqueEventID, nodeId, newMember);
- sendMemberUp(nodeId, memberToSend);
return true;
}
-
+ else
+ {
+ return false;
+ }
}
}
-
- if (Topology.log.isDebugEnabled())
- {
- Topology.log.debug(Topology.this + " Add member nodeId=" +
- nodeId +
- " member = " +
- memberInput +
- " has been ignored since there was no change", new Exception("trace"));
- }
-
- return false;
}
/**
* @param nodeId
* @param memberToSend
*/
- private void sendMemberUp(final String nodeId, final TopologyMember memberToSend)
+ private void sendMemberUp(final long uniqueEventID, final String nodeId, final TopologyMember memberToSend)
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -207,12 +254,17 @@
{
if (Topology.log.isTraceEnabled())
{
- Topology.log.trace(Topology.this + " informing " + listener + " about node up = " + nodeId);
+ Topology.log.trace(Topology.this + " informing " +
+ listener +
+ " about node up = " +
+ nodeId +
+ " connector = " +
+ memberToSend.getConnector());
}
try
{
- listener.nodeUP(nodeId, memberToSend.getConnector(), false);
+ listener.nodeUP(uniqueEventID, nodeId, memberToSend.getConnector(), false);
}
catch (Throwable e)
{
@@ -224,46 +276,6 @@
}
/**
- * @param nodeId
- * @param backOfData
- */
- private boolean testBackof(final String nodeId)
- {
- Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
-
- if (backOfData != null)
- {
- backOfData.b += 1;
-
- long timeDiff = System.currentTimeMillis() - backOfData.a;
-
- // To prevent a loop where nodes are being considered down and up
- if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
- {
-
- // The cluster may get in loop without this..
- // Case one node is stll sending nodeDown while another member is sending nodeUp
- log.warn(backOfData.b + ", The topology controller identified a blast events and it's interrupting the flow of the loop, nodeID=" +
- nodeId +
- ", topologyInstance=" +
- this,
- new Exception("this exception is just to trace location"));
- return false;
- }
- else if (timeDiff < BACKOF_TIMEOUT)
- {
- log.warn(this + "::Simple blast of " + nodeId, new Exception("this exception is just to trace location"));
- }
- else if (timeDiff >= BACKOF_TIMEOUT)
- {
- mapBackof.remove(nodeId);
- }
- }
-
- return true;
- }
-
- /**
* @return
*/
private ArrayList<ClusterTopologyListener> copyListeners()
@@ -276,28 +288,26 @@
return listenersCopy;
}
- public boolean removeMember(final String nodeId)
+ public boolean removeMember(final long uniqueEventID, final String nodeId)
{
TopologyMember member;
synchronized (this)
{
- Pair<Long, Integer> value = mapBackof.get(nodeId);
-
- if (value == null)
+ member = mapTopology.get(nodeId);
+ if (member != null)
{
- value = new Pair<Long, Integer>(0l, 0);
- mapBackof.put(nodeId, value);
+ if (member.getUniqueEventID() > uniqueEventID)
+ {
+ log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+ member = null;
+ }
+ else
+ {
+ mapDelete.put(nodeId, uniqueEventID);
+ member = mapTopology.remove(nodeId);
+ }
}
-
- value.a = System.currentTimeMillis();
-
- if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
- {
- value.b = 0;
- }
-
- member = mapTopology.remove(nodeId);
}
if (Topology.log.isDebugEnabled())
@@ -327,7 +337,7 @@
}
try
{
- listener.nodeDown(nodeId);
+ listener.nodeDown(uniqueEventID, nodeId);
}
catch (Exception e)
{
@@ -354,14 +364,13 @@
}
/**
- * it will send all the member updates to listeners, independently of being changed or not
+ * it will send the member to its listeners
* @param nodeID
* @param member
*/
- public void sendMemberToListeners(final String nodeID, final TopologyMember member)
+ public void sendMember(final String nodeID)
{
- // To make sure it was updated
- addMember(nodeID, member, false);
+ final TopologyMember member = getMember(nodeID);
final ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -380,7 +389,7 @@
" with connector=" +
member.getConnector());
}
- listener.nodeUP(nodeID, member.getConnector(), false);
+ listener.nodeUP(member.getUniqueEventID(), nodeID, member.getConnector(), false);
}
}
});
@@ -417,18 +426,21 @@
" to " +
listener);
}
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
+ listener.nodeUP(entry.getValue().getUniqueEventID(),
+ entry.getKey(),
+ entry.getValue().getConnector(),
+ ++count == copy.size());
}
}
});
}
- public TopologyMember getMember(final String nodeID)
+ public synchronized TopologyMember getMember(final String nodeID)
{
return mapTopology.get(nodeID);
}
- public boolean isEmpty()
+ public synchronized boolean isEmpty()
{
return mapTopology.isEmpty();
}
@@ -448,11 +460,11 @@
int count = 0;
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null)
+ if (member.getA() != null)
{
count++;
}
- if (member.getConnector().b != null)
+ if (member.getB() != null)
{
count++;
}
@@ -499,30 +511,13 @@
this.owner = owner;
}
- private boolean hasChanged(final String debugInfo, final TransportConfiguration a, final TransportConfiguration b)
- {
- boolean changed = a == null && b != null || a != null && b != null && !a.equals(b);
-
- if (log.isTraceEnabled())
- {
-
- log.trace(this + "::Validating current=" + a
- + " != input=" + b +
- (changed ? " and it has changed" : " and it didn't change") +
- ", for validation of " +
- debugInfo);
- }
-
- return changed;
- }
-
public TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
{
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
+ if (member.getA() != null && member.getA().equals(connectorConfiguration))
{
- return member.getConnector().b;
+ return member.getB();
}
}
return null;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -27,21 +27,61 @@
private final Pair<TransportConfiguration, TransportConfiguration> connector;
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
+ /** transient to avoid serialization changes */
+ private transient long uniqueEventID = System.currentTimeMillis();
+
+ public TopologyMember(final Pair<TransportConfiguration, TransportConfiguration> connector)
{
this.connector = connector;
+ uniqueEventID = System.currentTimeMillis();
}
- public TopologyMember(TransportConfiguration a, TransportConfiguration b)
+ public TopologyMember(final TransportConfiguration a, final TransportConfiguration b)
{
this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
}
+ public TransportConfiguration getA()
+ {
+ return connector.a;
+ }
+
+ public TransportConfiguration getB()
+ {
+ return connector.b;
+ }
+
+ public void setB(final TransportConfiguration param)
+ {
+ connector.b = param;
+ }
+
+ public void setA(final TransportConfiguration param)
+ {
+ connector.a = param;
+ }
+
+ /**
+ * @return the uniqueEventID
+ */
+ public long getUniqueEventID()
+ {
+ return uniqueEventID;
+ }
+
+ /**
+ * @param uniqueEventID the uniqueEventID to set
+ */
+ public void setUniqueEventID(final long uniqueEventID)
+ {
+ this.uniqueEventID = uniqueEventID;
+ }
+
public Pair<TransportConfiguration, TransportConfiguration> getConnector()
{
return connector;
}
-
+
@Override
public String toString()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -242,6 +242,7 @@
try
{
clusterConnection.start();
+ clusterConnection.flushExecutor();
}
finally
{
@@ -255,6 +256,7 @@
try
{
clusterConnection.stop();
+ clusterConnection.flushExecutor();
}
finally
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -373,8 +373,12 @@
if (complete)
{
- log.info("Address " + pagingStore.getAddress() +
+ if (log.isDebugEnabled())
+ {
+ log.debug("Address " + pagingStore.getAddress() +
" is leaving page mode as all messages are consumed and acknowledged from the page store");
+ }
+
pagingStore.forceAnotherPage();
Page currentPage = pagingStore.getCurrentPage();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/Channel.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -36,6 +36,9 @@
* @return the id
*/
long getID();
+
+ /** For protocol check */
+ boolean supports(byte packetID);
/**
* sends a packet on this channel.
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -25,6 +25,15 @@
*/
public interface CoreRemotingConnection extends RemotingConnection
{
+
+ /** The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types */
+ int getClientVersion();
+
+ /** The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types */
+ void setClientVersion(int clientVersion);
+
/**
* return the channel with the channel id specified.
* <p/>
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -94,6 +94,19 @@
resendCache = null;
}
}
+
+ public boolean supports(final byte packetType)
+ {
+ int version = connection.getClientVersion();
+
+ switch (packetType)
+ {
+ case PacketImpl.CLUSTER_TOPOLOGY_V2:
+ return version >= 122;
+ default:
+ return true;
+ }
+ }
public long getID()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -32,9 +32,11 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -112,13 +114,19 @@
// Just send a ping back
channel0.send(packet);
}
- else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
+ else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY || packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
{
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
+ if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
+ {
+ channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2)msg).getClientVersion());
+ }
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(final String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -129,12 +137,19 @@
{
public void run()
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ {
+ channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID, connectorPair, last));
+ }
+ else
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ }
}
});
}
- public void nodeDown(final String nodeID)
+ public void nodeDown(final long uniqueEventID, final String nodeID)
{
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
@@ -143,7 +158,14 @@
{
public void run()
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ {
+ channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID));
+ }
+ else
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ }
}
});
}
@@ -177,13 +199,13 @@
}
else
{
- pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), msg.getBackupConnector());
}
if (isTrace)
{
log.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
+ server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
}
}
});
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -163,6 +163,17 @@
"Server will not accept create session requests");
}
+ if (connection.getClientVersion() == 0)
+ {
+ connection.setClientVersion(request.getVersion());
+ }
+ else if (connection.getClientVersion() != request.getVersion())
+ {
+ log.warn("Client is not being consistent on the request versioning. " +
+ "It just sent a version id=" + request.getVersion() +
+ " while it informed " + connection.getClientVersion() + " previously");
+ }
+
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
ServerSession session = server.createSession(request.getName(),
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
@@ -83,11 +84,13 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -152,6 +155,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
/**
* A PacketDecoder
@@ -504,6 +508,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case CLUSTER_TOPOLOGY_V2:
+ {
+ packet = new ClusterTopologyChangeMessage_V2();
+ break;
+ }
case NODE_ANNOUNCE:
{
packet = new NodeAnnounceMessage();
@@ -514,6 +523,11 @@
packet = new SubscribeClusterTopologyUpdatesMessage();
break;
}
+ case SUBSCRIBE_TOPOLOGY_V2:
+ {
+ packet = new SubscribeClusterTopologyUpdatesMessageV2();
+ break;
+ }
case SESS_ADD_METADATA:
{
packet = new SessionAddMetaDataMessage();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -193,7 +193,13 @@
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+
+ // For newer versions
+ public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
+
+ public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -72,6 +72,8 @@
private volatile boolean destroyed;
private final boolean client;
+
+ private int clientVersion;
// Channels 0-9 are reserved for the system
// 0 is for pinging
@@ -183,7 +185,23 @@
failureListeners.addAll(listeners);
}
+
+ /**
+ * @return the clientVersion
+ */
+ public int getClientVersion()
+ {
+ return clientVersion;
+ }
+ /**
+ * @param clientVersion the clientVersion to set
+ */
+ public void setClientVersion(int clientVersion)
+ {
+ this.clientVersion = clientVersion;
+ }
+
public Object getID()
{
return transportConnection.getID();
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Clebert Suconic
+ *
+ */
+public class ClusterTopologyChangeMessage_V2 extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private boolean exit;
+
+ private String nodeID;
+
+ private Pair<TransportConfiguration, TransportConfiguration> pair;
+
+ private long uniqueEventID;
+
+ private boolean last;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+ this.nodeID = nodeID;
+
+ this.pair = pair;
+
+ this.last = last;
+
+ this.exit = false;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+ this.exit = true;
+
+ this.nodeID = nodeID;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V2()
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getPair()
+ {
+ return pair;
+ }
+
+ public boolean isLast()
+ {
+ return last;
+ }
+
+ /**
+ * @return the uniqueEventID
+ */
+ public long getUniqueEventID()
+ {
+ return uniqueEventID;
+ }
+
+ public boolean isExit()
+ {
+ return exit;
+ }
+
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeBoolean(exit);
+ buffer.writeString(nodeID);
+ buffer.writeLong(uniqueEventID);
+ if (!exit)
+ {
+ if (pair.a != null)
+ {
+ buffer.writeBoolean(true);
+ pair.a.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ if (pair.b != null)
+ {
+ buffer.writeBoolean(true);
+ pair.b.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ buffer.writeBoolean(last);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ exit = buffer.readBoolean();
+ nodeID = buffer.readString();
+ uniqueEventID = buffer.readLong();
+ if (!exit)
+ {
+ boolean hasLive = buffer.readBoolean();
+ TransportConfiguration a;
+ if(hasLive)
+ {
+ a = new TransportConfiguration();
+ a.decode(buffer);
+ }
+ else
+ {
+ a = null;
+ }
+ boolean hasBackup = buffer.readBoolean();
+ TransportConfiguration b;
+ if (hasBackup)
+ {
+ b = new TransportConfiguration();
+ b.decode(buffer);
+ }
+ else
+ {
+ b = null;
+ }
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+ last = buffer.readBoolean();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -34,21 +34,29 @@
private boolean backup;
+ private long currentEventID;
+
private TransportConfiguration connector;
+ private TransportConfiguration backupConnector;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ public NodeAnnounceMessage(final long currentEventID, final String nodeID, final boolean backup, final TransportConfiguration tc, final TransportConfiguration backupConnector)
{
super(PacketImpl.NODE_ANNOUNCE);
+ this.currentEventID = currentEventID;
+
this.nodeID = nodeID;
this.backup = backup;
this.connector = tc;
+
+ this.backupConnector = backupConnector;
}
public NodeAnnounceMessage()
@@ -74,13 +82,43 @@
return connector;
}
+ public TransportConfiguration getBackupConnector()
+ {
+ return backupConnector;
+ }
+ /**
+ * @return the currentEventID
+ */
+ public long getCurrentEventID()
+ {
+ return currentEventID;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
buffer.writeBoolean(backup);
- connector.encode(buffer);
+ buffer.writeLong(currentEventID);
+ if (connector != null)
+ {
+ buffer.writeBoolean(true);
+ connector.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ if (backupConnector != null)
+ {
+ buffer.writeBoolean(true);
+ backupConnector.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
}
@Override
@@ -88,8 +126,17 @@
{
this.nodeID = buffer.readString();
this.backup = buffer.readBoolean();
- connector = new TransportConfiguration();
- connector.decode(buffer);
+ this.currentEventID = buffer.readLong();
+ if (buffer.readBoolean())
+ {
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+ if (buffer.readBoolean())
+ {
+ backupConnector = new TransportConfiguration();
+ backupConnector.decode(buffer);
+ }
}
/* (non-Javadoc)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -42,11 +42,23 @@
this.clusterConnection = clusterConnection;
}
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection)
+ {
+ super(packetType);
+
+ this.clusterConnection = clusterConnection;
+ }
+
public SubscribeClusterTopologyUpdatesMessage()
{
super(PacketImpl.SUBSCRIBE_TOPOLOGY);
}
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType)
+ {
+ super(packetType);
+ }
+
// Public --------------------------------------------------------
public boolean isClusterConnection()
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int clientVersion;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion)
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2, clusterConnection);
+
+ this.clientVersion = clientVersion;
+ }
+
+ public SubscribeClusterTopologyUpdatesMessageV2()
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ super.encodeRest(buffer);
+ buffer.writeInt(clientVersion);
+ }
+
+ /**
+ * @return the clientVersion
+ */
+ public int getClientVersion()
+ {
+ return clientVersion;
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ super.decodeRest(buffer);
+ clientVersion = buffer.readInt();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -15,12 +15,9 @@
import java.util.List;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.persistence.OperationContext;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -46,6 +46,8 @@
void activate() throws Exception;
TransportConfiguration getConnector();
+
+ void flushExecutor();
// for debug
String describe();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -48,10 +48,8 @@
void activate();
- void notifyNodeDown(String nodeID);
+ void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
-
Topology getTopology();
void flushExecutor();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -101,7 +101,7 @@
private final Transformer transformer;
- private volatile ClientSessionFactory csf;
+ private volatile ClientSessionFactoryInternal csf;
protected volatile ClientSessionInternal session;
@@ -200,6 +200,7 @@
{
this.notificationService = notificationService;
}
+
public synchronized void start() throws Exception
{
if (started)
@@ -652,6 +653,15 @@
}
}
+ /**
+ * @return
+ */
+ protected ClientSessionFactoryInternal getCurrentFactory()
+ {
+ return csf;
+ }
+
+
/* Hook for creating session factory */
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -71,6 +71,8 @@
private final SimpleString idsHeaderName;
private final String targetNodeID;
+
+ private final long targetNodeEventUID;
private final TransportConfiguration connector;
@@ -85,6 +87,7 @@
final double retryMultiplier,
final long maxRetryInterval,
final UUID nodeUUID,
+ final long targetNodeEventUID,
final String targetNodeID,
final SimpleString name,
final Queue queue,
@@ -130,6 +133,7 @@
this.clusterManager = clusterManager;
+ this.targetNodeEventUID = targetNodeEventUID;
this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
@@ -319,7 +323,7 @@
if (permanently)
{
log.debug("cluster node for bridge " + this.getName() + " is permanently down");
- discoveryLocator.notifyNodeDown(targetNodeID);
+ discoveryLocator.notifyNodeDown(targetNodeEventUID+1, targetNodeID);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -33,9 +33,10 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.AfterConnectInternalListener;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
@@ -56,6 +57,7 @@
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -69,7 +71,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection
+public class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -332,20 +334,34 @@
this.clusterManagerTopology = clusterManagerTopology;
}
- public synchronized void start() throws Exception
+ public void start() throws Exception
{
- if (started)
+ synchronized (this)
{
- return;
+ if (started)
+ {
+ return;
+ }
+
+
+ started = true;
+
+ if (!backup)
+ {
+ activate();
+ }
}
- started = true;
-
- if (!backup)
+ }
+
+ public void flushExecutor()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ if (!future.await(10000))
{
- activate();
+ server.threadDump("Couldn't finish executor on " + this);
}
-
}
public void stop() throws Exception
@@ -411,6 +427,25 @@
started = false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
+ */
+ public void onConnection(ClientSessionFactoryInternal sf)
+ {
+ TopologyMember localMember = manager.getLocalMember();
+ sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+ manager.getNodeId(),
+ false,
+ localMember.getConnector().a,
+ localMember.getConnector().b);
+
+ // sf.sendNodeAnnounce(System.currentTimeMillis(),
+ // manager.getNodeId(),
+ // false,
+ // localMember.getConnector().a,
+ // localMember.getConnector().b);
+ }
+
public boolean isStarted()
{
return started;
@@ -471,6 +506,14 @@
log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
}
+ final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
+
+ if (currentMember == null)
+ {
+ // sanity check only
+ throw new IllegalStateException("InternalError! The ClusterConnection doesn't know about its own node = " + this);
+ }
+
serverLocator.setNodeID(nodeUUID.toString());
serverLocator.setIdentity("(main-ClusterConnection::" + server.toString() + ")");
serverLocator.setReconnectAttempts(0);
@@ -493,6 +536,8 @@
serverLocator.addClusterTopologyListener(this);
+ serverLocator.setAfterConnectionInternalListener(this);
+
serverLocator.start(server.getExecutorFactory().getExecutor());
}
@@ -515,7 +560,7 @@
// ClusterTopologyListener implementation ------------------------------------------------------------------
- public void nodeDown(final String nodeID)
+ public void nodeDown(final long eventUID, final String nodeID)
{
if (log.isDebugEnabled())
{
@@ -544,12 +589,11 @@
{
log.error("Failed to close flow record", e);
}
-
- server.getClusterManager().notifyNodeDown(nodeID);
}
}
- public void nodeUP(final String nodeID,
+ public void nodeUP(final long eventUID,
+ final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -562,20 +606,18 @@
if (nodeID.equals(nodeUUID.toString()))
{
- if (connectorPair.b != null)
+ if (log.isTraceEnabled())
{
- if (log.isTraceEnabled())
- {
- log.trace(this + "::informing about backup to itself, nodeUUID=" + nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
- }
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+ log.trace(this + "::informing about backup to itself, nodeUUID=" +
+ nodeUUID +
+ ", connectorPair=" +
+ connectorPair +
+ " this = " +
+ this);
}
return;
}
- // we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
-
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
@@ -615,6 +657,7 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
}
+ log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -635,7 +678,7 @@
queue = server.createQueue(queueName, queueName, null, true, false);
}
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+ createNewRecord(eventUID, nodeID, connectorPair.a, queueName, queue, true);
}
else
{
@@ -656,7 +699,8 @@
}
}
- private void createNewRecord(final String targetNodeID,
+ private void createNewRecord(final long eventUID,
+ final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue,
@@ -679,6 +723,8 @@
targetLocator.setMaxRetryInterval(maxRetryInterval);
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ targetLocator.setAfterConnectionInternalListener(this);
+
targetLocator.setNodeID(serverLocator.getNodeID());
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
@@ -689,9 +735,14 @@
}
targetLocator.disableFinalizeCheck();
-
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, targetNodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
+ eventUID,
+ targetNodeID,
+ connector,
+ queueName,
+ queue);
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
manager,
targetLocator,
@@ -701,6 +752,7 @@
retryIntervalMultiplier,
maxRetryInterval,
nodeUUID,
+ record.getEventUID(),
record.getTargetNodeID(),
record.getQueueName(),
record.getQueue(),
@@ -742,6 +794,8 @@
{
private BridgeImpl bridge;
+ private final long eventUID;
+
private final String targetNodeID;
private final TransportConfiguration connector;
@@ -761,6 +815,7 @@
private volatile boolean firstReset = false;
public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+ final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
@@ -771,6 +826,7 @@
this.targetNodeID = targetNodeID;
this.connector = connector;
this.queueName = queueName;
+ this.eventUID = eventUID;
}
/* (non-Javadoc)
@@ -804,6 +860,14 @@
}
/**
+ * @return the eventUID
+ */
+ public long getEventUID()
+ {
+ return eventUID;
+ }
+
+ /**
* @return the nodeID
*/
public String getTargetNodeID()
@@ -1091,7 +1155,8 @@
// hops is too high
// or there are multiple cluster connections for the same address
- ClusterConnectionImpl.log.warn("Remote queue binding " + clusterName +
+ ClusterConnectionImpl.log.warn(this + "::Remote queue binding " +
+ clusterName +
" has already been bound in the post office. Most likely cause for this is you have a loop " +
"in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -168,6 +168,16 @@
{
return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
}
+
+ public TopologyMember getLocalMember()
+ {
+ return topology.getMember(nodeUUID.toString());
+ }
+
+ public String getNodeId()
+ {
+ return nodeUUID.toString();
+ }
public synchronized void start() throws Exception
{
@@ -183,9 +193,34 @@
deployBroadcastGroup(config);
}
+ String connectorName = null;
+
for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
{
- deployClusterConnection(config);
+ if (connectorName == null)
+ {
+ connectorName = config.getConnectorName();
+ break;
+ }
+ }
+
+ if (connectorName != null)
+ {
+ TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
+ if (nodeConnector == null)
+ {
+ log.warn("No connecor with name '" + connectorName +
+ "'. The cluster connection will not be deployed.");
+ return;
+ }
+
+ // Now announce presence
+ announceNode(nodeConnector);
+
+ for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
+ {
+ deployClusterConnection(config);
+ }
}
}
@@ -195,13 +230,6 @@
deployBridge(config);
}
- // Now announce presence
-
- if (clusterConnections.size() > 0)
- {
- announceNode();
- }
-
started = true;
}
@@ -264,81 +292,26 @@
clusterConnections.clear();
}
- public void notifyNodeDown(String nodeID)
+ public void nodeAnnounced(final long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean backup)
{
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
- log.debug(this + "::removing nodeID=" + nodeID, new Exception("trace"));
-
- topology.removeMember(nodeID);
-
- }
-
- public void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last,
- final boolean nodeAnnounce)
- {
if (log.isDebugEnabled())
{
- log.debug(this + "::NodeUp " + nodeID + connectorPair + ", nodeAnnounce=" + nodeAnnounce);
+ log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
}
- TopologyMember member = new TopologyMember(connectorPair);
- boolean updated = topology.addMember(nodeID, member, last);
-
- if (!updated)
+ TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+ newMember.setUniqueEventID(uniqueEventID);
+ if (backup)
{
- if (log.isDebugEnabled())
- {
- log.debug(this + " ignored notifyNodeUp on nodeID=" +
- nodeID +
- " pair=" +
- connectorPair +
- " as the topology already knew about it");
- }
- return;
+ topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
}
-
- if (log.isDebugEnabled())
+ else
{
- log.debug(this + " received notifyNodeUp nodeID=" +
- nodeID +
- " connectorPair=" +
- connectorPair +
- ", nodeAnnounce=" +
- nodeAnnounce +
- ", last=" +
- last);
+ topology.updateMember(uniqueEventID, nodeID, newMember);
}
-
- // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
- // connections.
- if (nodeAnnounce)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Informing " + nodeID + " to " + clusterConnections.toString());
- }
- for (ClusterConnection clusterConnection : clusterConnections.values())
- {
- if (log.isTraceEnabled())
- {
- log.trace(this + " information clusterConnection=" +
- clusterConnection +
- " nodeID=" +
- nodeID +
- " connectorPair=" +
- connectorPair +
- " last=" +
- last);
- }
- clusterConnection.nodeUP(nodeID, connectorPair, last);
- }
- }
}
public void flushExecutor()
@@ -347,7 +320,8 @@
executor.execute(future);
if (!future.await(10000))
{
- server.threadDump("Couldn't flush ClusterManager executor (" + this + ") in 10 seconds, verify your thread pool size");
+ server.threadDump("Couldn't flush ClusterManager executor (" + this +
+ ") in 10 seconds, verify your thread pool size");
}
}
@@ -405,9 +379,8 @@
TopologyMember member = topology.getMember(nodeID);
// swap backup as live and send it to everybody
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b,
- null));
- topology.addMember(nodeID, member, false);
+ member = new TopologyMember(member.getConnector().b, null);
+ topology.updateAsLive(nodeID, member);
if (backupServerLocator != null)
{
@@ -460,7 +433,7 @@
}
}
- topology.sendMemberToListeners(nodeID, member);
+ topology.sendMember(nodeID);
}
}
@@ -496,43 +469,21 @@
this.clusterLocators.remove(serverLocator);
}
- private synchronized void announceNode()
+ private synchronized void announceNode(final TransportConfiguration nodeConnector)
{
- // TODO does this really work with more than one cluster connection? I think not
-
- // Just take the first one for now
- ClusterConnection cc = clusterConnections.values().iterator().next();
-
String nodeID = server.getNodeID().toString();
-
- TopologyMember member = topology.getMember(nodeID);
-
- if (member == null)
+
+ TopologyMember localMember;
+ if (backup)
{
- if (backup)
- {
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null,
- cc.getConnector()));
- }
- else
- {
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(),
- null));
- }
-
- topology.addMember(nodeID, member, false);
+ localMember = new TopologyMember(null, nodeConnector);
}
else
{
- if (backup)
- {
- // pair.b = cc.getConnector();
- }
- else
- {
- // pair.a = cc.getConnector();
- }
+ localMember = new TopologyMember(nodeConnector, null);
}
+
+ topology.updateAsLive(nodeID, localMember);
}
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
@@ -957,7 +908,11 @@
{
backupSessionFactory.getConnection()
.getChannel(0, -1)
- .send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
log.info("backup announced");
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -14,6 +14,7 @@
package org.hornetq.core.server.cluster.impl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.server.cluster.ClusterManager;
/**
@@ -28,5 +29,9 @@
void addClusterLocator(ServerLocatorInternal locator);
void removeClusterLocator(ServerLocatorInternal locator);
+
+ TopologyMember getLocalMember();
+
+ String getNodeId();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -354,7 +354,6 @@
if(nodeManager.isBackupLive())
{
- Thread.sleep(configuration.getFailbackDelay());
//looks like we've failed over at some point need to inform that we are the backup so when the current live
// goes down they failover to us
clusterManager.announceBackup();
@@ -514,7 +513,7 @@
if (System.currentTimeMillis() - start >= timeout)
{
- log.warn("Timed out waiting for backup activation to exit");
+ threadDump("Timed out waiting for backup activation to exit");
}
nodeManager.stopBackup();
@@ -862,6 +861,10 @@
nodeManager.stop();
nodeManager = null;
+
+ addressSettingsRepository.clearListeners();
+
+ addressSettingsRepository.clearCache();
HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + tempNodeID + "] stopped");
@@ -2004,7 +2007,14 @@
public String toString()
{
- return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+ }
}
// Inner classes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -83,12 +83,12 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession , FailureListener
+public class ServerSessionImpl implements ServerSession, FailureListener
{
// Constants -----------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
// Static -------------------------------------------------------------------------------
@@ -147,14 +147,14 @@
private volatile SimpleString defaultAddress;
private volatile int timeoutSeconds;
-
+
private Map<String, String> metaData;
-
+
private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
- private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
-
+ private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+
private long creationTime = System.currentTimeMillis();
// Constructors ---------------------------------------------------------------------------------
@@ -244,7 +244,6 @@
this.sessionContext = sessionContext;
}
-
public String getUsername()
{
return username;
@@ -269,8 +268,9 @@
{
return remotingConnection.getID();
}
-
- public Set<ServerConsumer> getServerConsumers() {
+
+ public Set<ServerConsumer> getServerConsumers()
+ {
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
return Collections.unmodifiableSet(consumersClone);
}
@@ -316,7 +316,7 @@
}
remotingConnection.removeFailureListener(this);
-
+
callback.closed();
}
@@ -402,7 +402,7 @@
// dies. It does not mean it will get deleted automatically when the
// session is closed.
// It is up to the user to delete the queue when finished with it
-
+
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name, queue);
remotingConnection.addCloseListener(cleaner);
@@ -411,8 +411,7 @@
tempQueueCleannerUppers.put(name, cleaner);
}
}
-
-
+
/**
* For test cases only
* @return
@@ -427,7 +426,7 @@
private final PostOffice postOffice;
private final SimpleString bindingName;
-
+
private final Queue queue;
TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName, final Queue queue)
@@ -435,7 +434,7 @@
this.postOffice = postOffice;
this.bindingName = bindingName;
-
+
this.queue = queue;
}
@@ -443,15 +442,15 @@
{
try
{
- if (log.isDebugEnabled())
- {
- log.debug("deleting temporary queue " + bindingName);
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("deleting temporary queue " + bindingName);
+ }
if (postOffice.getBinding(bindingName) != null)
{
postOffice.removeBinding(bindingName);
}
-
+
queue.deleteAllReferences();
}
catch (Exception e)
@@ -469,7 +468,7 @@
{
run();
}
-
+
public String toString()
{
return "Temporary Cleaner for queue " + bindingName;
@@ -489,11 +488,11 @@
server.destroyQueue(name, this);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(name);
-
+
if (cleaner != null)
{
remotingConnection.removeCloseListener(cleaner);
-
+
remotingConnection.removeFailureListener(cleaner);
}
}
@@ -576,8 +575,8 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
-
- consumer.acknowledge(autoCommitAcks, tx, messageID);
+
+ consumer.acknowledge(autoCommitAcks, tx, messageID);
}
public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
@@ -935,7 +934,7 @@
throw new HornetQXAException(XAException.XAER_PROTO,
"Cannot prepare transaction, it is suspended " + xid);
}
- else if(theTx.getState() == Transaction.State.PREPARED)
+ else if (theTx.getState() == Transaction.State.PREPARED)
{
log.info("ignoring prepare on xid as already called :" + xid);
}
@@ -966,7 +965,7 @@
public void xaSetTimeout(final int timeout)
{
timeoutSeconds = timeout;
- if(tx != null)
+ if (tx != null)
{
tx.setTimeout(timeout);
}
@@ -981,18 +980,18 @@
{
setStarted(false);
}
-
+
public void waitContextCompletion()
{
OperationContext formerCtx = storageManager.getContext();
-
+
try
{
try
{
if (!storageManager.waitOnOperations(10000))
{
- log.warn("Couldn't finish context execution in 10 seconds", new Exception ("warning"));
+ log.warn("Couldn't finish context execution in 10 seconds", new Exception("warning"));
}
}
catch (Exception e)
@@ -1009,7 +1008,7 @@
public void close(final boolean failed)
{
OperationContext formerCtx = storageManager.getContext();
-
+
try
{
storageManager.setContext(sessionContext);
@@ -1019,7 +1018,7 @@
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
try
@@ -1071,9 +1070,9 @@
{
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
-
+
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
-
+
if (currentLargeMessage != null)
{
ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
@@ -1085,7 +1084,7 @@
public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
-
+
SimpleString address = message.getAddress();
message.setMessageID(id);
@@ -1111,7 +1110,6 @@
log.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
-
if (message.getAddress().equals(managementAddress))
{
// It's a management message
@@ -1129,7 +1127,10 @@
}
}
- public void sendContinuations(final int packetSize, final long messageBodySize, final byte[] body, final boolean continues) throws Exception
+ public void sendContinuations(final int packetSize,
+ final long messageBodySize,
+ final byte[] body,
+ final boolean continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -1144,7 +1145,7 @@
if (!continues)
{
currentLargeMessage.releaseResources();
-
+
if (messageBodySize >= 0)
{
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
@@ -1178,7 +1179,6 @@
consumer.setTransferring(transferring);
}
}
-
public void addMetaData(String key, String data)
{
@@ -1198,7 +1198,7 @@
}
return data;
}
-
+
public String[] getTargetAddresses()
{
Map<SimpleString, Pair<UUID, AtomicLong>> copy = cloneTargetAddresses();
@@ -1238,7 +1238,7 @@
public void describeProducersInfo(JSONArray array) throws Exception
{
Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy = cloneTargetAddresses();
-
+
for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry : targetCopy.entrySet())
{
JSONObject producerInfo = new JSONObject();
@@ -1251,7 +1251,6 @@
}
}
-
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1271,7 +1270,6 @@
}
}
-
// Public
// ----------------------------------------------------------------------------
@@ -1337,7 +1335,7 @@
toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
}
-
+
for (MessageReference ref : toCancel)
{
ref.getQueue().cancel(theTx, ref);
@@ -1379,12 +1377,12 @@
}
postOffice.route(msg, routingContext, direct);
-
+
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
-
+
if (value == null)
{
- targetAddressInfos.put(msg.getAddress(), new Pair<UUID,AtomicLong>(msg.getUserID(), new AtomicLong(1)));
+ targetAddressInfos.put(msg.getAddress(), new Pair<UUID, AtomicLong>(msg.getUserID(), new AtomicLong(1)));
}
else
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -304,11 +304,14 @@
public void unregisterAcceptors()
{
List<String> acceptors = new ArrayList<String>();
- for (String resourceName : registry.keySet())
+ synchronized (this)
{
- if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
+ for (String resourceName : registry.keySet())
{
- acceptors.add(resourceName);
+ if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
+ {
+ acceptors.add(resourceName);
+ }
}
}
@@ -508,7 +511,7 @@
registry.put(resourceName, managedResource);
}
- public void unregisterFromRegistry(final String resourceName)
+ public synchronized void unregisterFromRegistry(final String resourceName)
{
registry.remove(resourceName);
}
@@ -618,7 +621,29 @@
messageCounterManager.clear();
}
+
+ listeners.clear();
+
+ registry.clear();
+ messagingServer = null;
+
+ securityRepository = null;
+
+ addressSettingsRepository = null;
+
+ messagingServerControl = null;
+
+ messageCounterManager = null;
+
+ postOffice = null;
+
+ pagingManager = null;
+
+ storageManager = null;
+
+ messagingServer = null;
+
registeredNames.clear();
started = false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -65,6 +65,8 @@
*/
void clear();
+ void clearListeners();
+
void clearCache();
int getCacheSize();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -221,6 +221,11 @@
matches.clear();
}
+ public void clearListeners()
+ {
+ listeners.clear();
+ }
+
public void clearCache()
{
cache.clear();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSMessageCounterTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -40,37 +40,49 @@
public void testMessageCounter() throws Exception
{
- Connection conn = cf.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = createQueue(true, "Test");
-
- MessageProducer producer = sess.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
+ try
{
- TextMessage mess = sess.createTextMessage("msg" + i);
- producer.send(mess);
+ Connection conn = cf.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = createQueue(true, "Test");
+
+ MessageProducer producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage mess = sess.createTextMessage("msg" + i);
+ producer.send(mess);
+ }
+
+ conn.close();
+
+ JMSQueueControl control = (JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
+ assertNotNull(control);
+
+ System.out.println(control.listMessageCounterAsHTML());
+
+ jmsServer.stop();
+
+ restartServer();
+
+ control = (JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
+ assertNotNull(control);
+
+ System.out.println(control.listMessageCounterAsHTML());
}
-
- conn.close();
-
- JMSQueueControl control = (JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
- assertNotNull(control);
-
- System.out.println(control.listMessageCounterAsHTML());
-
- jmsServer.stop();
-
- restartServer();
-
- control = (JMSQueueControl)server.getManagementService().getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
- assertNotNull(control);
-
- System.out.println(control.listMessageCounterAsHTML());
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ jmsServer.stop();
+ }
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -217,137 +217,144 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(producerWindowSize);
- locator.setConsumerWindowSize(consumerWindowSize);
- locator.setAckBatchSize(ackBatchSize);
-
- if (minLargeMessageSize != -1)
+ try
{
- locator.setMinLargeMessageSize(minLargeMessageSize);
- }
- ClientSessionFactory sf = locator.createSessionFactory();
- ClientSession session = sf.createSession(false, true, true, true);
+ locator.setProducerWindowSize(producerWindowSize);
+ locator.setConsumerWindowSize(consumerWindowSize);
+ locator.setAckBatchSize(ackBatchSize);
- session.start();
+ if (minLargeMessageSize != -1)
+ {
+ locator.setMinLargeMessageSize(minLargeMessageSize);
+ }
- final String queueName = "testqueue";
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i < numConsumers; i++)
- {
- session.createQueue(address, new SimpleString(queueName + i), null, false);
- }
+ session.start();
- final byte[] bytes = RandomUtil.randomBytes(messageSize);
+ final String queueName = "testqueue";
- class MyHandler implements MessageHandler
- {
- int count = 0;
+ for (int i = 0; i < numConsumers; i++)
+ {
+ session.createQueue(address, new SimpleString(queueName + i), null, false);
+ }
- final CountDownLatch latch = new CountDownLatch(1);
+ final byte[] bytes = RandomUtil.randomBytes(messageSize);
- volatile Exception exception;
-
- public void onMessage(final ClientMessage message)
+ class MyHandler implements MessageHandler
{
- try
- {
- byte[] bytesRead = new byte[messageSize];
+ int count = 0;
- message.getBodyBuffer().readBytes(bytesRead);
+ final CountDownLatch latch = new CountDownLatch(1);
- UnitTestCase.assertEqualsByteArrays(bytes, bytesRead);
+ volatile Exception exception;
- message.acknowledge();
-
- if (++count == numMessages * numProducers)
+ public void onMessage(final ClientMessage message)
+ {
+ try
{
- latch.countDown();
- }
+ byte[] bytesRead = new byte[messageSize];
- if (consumerDelay > 0)
- {
- Thread.sleep(consumerDelay);
- }
+ message.getBodyBuffer().readBytes(bytesRead);
- }
- catch (Exception e)
- {
- ProducerFlowControlTest.log.error("Failed to handle message", e);
+ UnitTestCase.assertEqualsByteArrays(bytes, bytesRead);
- exception = e;
+ message.acknowledge();
- latch.countDown();
- }
- }
- }
+ if (++count == numMessages * numProducers)
+ {
+ latch.countDown();
+ }
- MyHandler[] handlers = new MyHandler[numConsumers];
+ if (consumerDelay > 0)
+ {
+ Thread.sleep(consumerDelay);
+ }
- for (int i = 0; i < numConsumers; i++)
- {
- handlers[i] = new MyHandler();
+ }
+ catch (Exception e)
+ {
+ ProducerFlowControlTest.log.error("Failed to handle message", e);
- ClientConsumer consumer = session.createConsumer(new SimpleString(queueName + i));
+ exception = e;
- consumer.setMessageHandler(handlers[i]);
- }
+ latch.countDown();
+ }
+ }
+ }
- ClientProducer[] producers = new ClientProducer[numProducers];
+ MyHandler[] handlers = new MyHandler[numConsumers];
- for (int i = 0; i < numProducers; i++)
- {
- if (anon)
+ for (int i = 0; i < numConsumers; i++)
{
- producers[i] = session.createProducer();
- }
- else
- {
- producers[i] = session.createProducer(address);
- }
- }
+ handlers[i] = new MyHandler();
- long start = System.currentTimeMillis();
+ ClientConsumer consumer = session.createConsumer(new SimpleString(queueName + i));
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(false);
+ consumer.setMessageHandler(handlers[i]);
+ }
- message.getBodyBuffer().writeBytes(bytes);
+ ClientProducer[] producers = new ClientProducer[numProducers];
- for (int j = 0; j < numProducers; j++)
+ for (int i = 0; i < numProducers; i++)
{
if (anon)
{
- producers[j].send(address, message);
+ producers[i] = session.createProducer();
}
else
{
- producers[j].send(message);
+ producers[i] = session.createProducer(address);
}
+ }
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ for (int j = 0; j < numProducers; j++)
+ {
+ if (anon)
+ {
+ producers[j].send(address, message);
+ }
+ else
+ {
+ producers[j].send(message);
+ }
+
+ }
}
- }
- for (int i = 0; i < numConsumers; i++)
- {
- assertTrue(handlers[i].latch.await(5, TimeUnit.MINUTES));
+ for (int i = 0; i < numConsumers; i++)
+ {
+ Assert.assertTrue(handlers[i].latch.await(5, TimeUnit.MINUTES));
- Assert.assertNull(handlers[i].exception);
- }
+ Assert.assertNull(handlers[i].exception);
+ }
- long end = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
- double rate = 1000 * (double)numMessages / (end - start);
+ double rate = 1000 * (double)numMessages / (end - start);
- ProducerFlowControlTest.log.info("rate is " + rate + " msgs / sec");
+ ProducerFlowControlTest.log.info("rate is " + rate + " msgs / sec");
- session.close();
+ session.close();
- sf.close();
-
- server.stop();
+ sf.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testClosingSessionUnblocksBlockedProducer() throws Exception
@@ -364,56 +371,63 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(1024);
- locator.setConsumerWindowSize(1024);
- locator.setAckBatchSize(1024);
+ try
+ {
- ClientSessionFactory sf = locator.createSessionFactory();
- final ClientSession session = sf.createSession(false, true, true, true);
+ locator.setProducerWindowSize(1024);
+ locator.setConsumerWindowSize(1024);
+ locator.setAckBatchSize(1024);
- final SimpleString queueName = new SimpleString("testqueue");
+ ClientSessionFactory sf = locator.createSessionFactory();
+ final ClientSession session = sf.createSession(false, true, true, true);
- session.createQueue(address, queueName, null, false);
+ final SimpleString queueName = new SimpleString("testqueue");
- ClientProducer producer = session.createProducer(address);
+ session.createQueue(address, queueName, null, false);
- byte[] bytes = new byte[2000];
+ ClientProducer producer = session.createProducer(address);
- ClientMessage message = session.createMessage(false);
+ byte[] bytes = new byte[2000];
- message.getBodyBuffer().writeBytes(bytes);
+ ClientMessage message = session.createMessage(false);
- final AtomicBoolean closed = new AtomicBoolean(false);
+ message.getBodyBuffer().writeBytes(bytes);
- Thread t = new Thread(new Runnable()
- {
- public void run()
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ Thread t = new Thread(new Runnable()
{
- try
+ public void run()
{
- Thread.sleep(500);
+ try
+ {
+ Thread.sleep(500);
- closed.set(true);
+ closed.set(true);
- session.close();
+ session.close();
+ }
+ catch (Exception e)
+ {
+ }
}
- catch (Exception e)
- {
- }
- }
- });
+ });
- t.start();
+ t.start();
- // This will block
- producer.send(message);
+ // This will block
+ producer.send(message);
- Assert.assertTrue(closed.get());
+ Assert.assertTrue(closed.get());
- t.join();
-
- server.stop();
+ t.join();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testFlowControlMessageNotRouted() throws Exception
@@ -430,33 +444,40 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(1024);
- locator.setConsumerWindowSize(1024);
- locator.setAckBatchSize(1024);
+ try
+ {
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setProducerWindowSize(1024);
+ locator.setConsumerWindowSize(1024);
+ locator.setAckBatchSize(1024);
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientProducer producer = session.createProducer(address);
+ final ClientSession session = sf.createSession(false, true, true, true);
- byte[] bytes = new byte[100];
+ ClientProducer producer = session.createProducer(address);
- final int numMessages = 1000;
+ byte[] bytes = new byte[100];
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(false);
+ final int numMessages = 1000;
- message.getBodyBuffer().writeBytes(bytes);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(false);
- producer.send(message);
- }
+ message.getBodyBuffer().writeBytes(bytes);
- session.close();
+ producer.send(message);
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
// Not technically a flow control test, but what the hell
@@ -465,66 +486,73 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
- session.createQueue("address", "queue2", null, false);
- session.createQueue("address", "queue3", null, false);
- session.createQueue("address", "queue4", null, false);
- session.createQueue("address", "queue5", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientConsumer consumer1 = session.createConsumer("queue1");
- ClientConsumer consumer2 = session.createConsumer("queue2");
- ClientConsumer consumer3 = session.createConsumer("queue3");
- ClientConsumer consumer4 = session.createConsumer("queue4");
- ClientConsumer consumer5 = session.createConsumer("queue5");
+ session.createQueue("address", "queue1", null, false);
+ session.createQueue("address", "queue2", null, false);
+ session.createQueue("address", "queue3", null, false);
+ session.createQueue("address", "queue4", null, false);
+ session.createQueue("address", "queue5", null, false);
- ClientProducer producer = session.createProducer("address");
+ ClientConsumer consumer1 = session.createConsumer("queue1");
+ ClientConsumer consumer2 = session.createConsumer("queue2");
+ ClientConsumer consumer3 = session.createConsumer("queue3");
+ ClientConsumer consumer4 = session.createConsumer("queue4");
+ ClientConsumer consumer5 = session.createConsumer("queue5");
- byte[] bytes = new byte[2000];
+ ClientProducer producer = session.createProducer("address");
- ClientMessage message = session.createMessage(false);
+ byte[] bytes = new byte[2000];
- message.getBodyBuffer().writeBytes(bytes);
+ ClientMessage message = session.createMessage(false);
- final int numMessages = 1000;
+ message.getBodyBuffer().writeBytes(bytes);
- for (int i = 0; i < numMessages; i++)
- {
- producer.send(message);
- }
+ final int numMessages = 1000;
- session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(message);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage msg = consumer1.receive(1000);
+ session.start();
- Assert.assertNotNull(msg);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = consumer1.receive(1000);
- msg = consumer2.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer2.receive(5000);
- msg = consumer3.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer3.receive(5000);
- msg = consumer4.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer4.receive(5000);
- msg = consumer5.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
- }
+ msg = consumer5.receive(5000);
- session.close();
+ Assert.assertNotNull(msg);
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching1() throws Exception
@@ -532,35 +560,43 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
{
- assertTrue(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address");
- credits = newCredits;
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- session.close();
+ credits = newCredits;
- server.stop();
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching2() throws Exception
@@ -568,37 +604,45 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
{
- assertTrue(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address");
- credits = newCredits;
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
- prod.close();
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ credits = newCredits;
- session.close();
+ prod.close();
- server.stop();
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching3() throws Exception
@@ -606,35 +650,43 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
{
- assertFalse(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address" + i);
- credits = newCredits;
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
- assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
- session.close();
+ credits = newCredits;
- server.stop();
+ Assert.assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching4() throws Exception
@@ -642,37 +694,45 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
{
- assertFalse(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address" + i);
- credits = newCredits;
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
- prod.close();
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
- assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ credits = newCredits;
- session.close();
+ prod.close();
- server.stop();
+ Assert.assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching5() throws Exception
@@ -680,63 +740,73 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- List<ClientProducerCredits> creditsList = new ArrayList<ClientProducerCredits>();
+ ClientProducerCredits credits = null;
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ List<ClientProducerCredits> creditsList = new ArrayList<ClientProducerCredits>();
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
- if (credits != null)
- {
- assertFalse(newCredits == credits);
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ Assert.assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+
+ creditsList.add(credits);
}
- credits = newCredits;
+ Iterator<ClientProducerCredits> iter = creditsList.iterator();
- assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
- creditsList.add(credits);
- }
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
- Iterator<ClientProducerCredits> iter = creditsList.iterator();
+ Assert.assertTrue(newCredits == iter.next());
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+ ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + (i + ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
- assertTrue(newCredits == iter.next());
+ Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE + i + 1,
+ ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
- ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ session.close();
}
-
- for (int i = 0; i < 10; i++)
+ finally
{
- ClientProducer prod = session.createProducer("address" + (i + ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
-
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE + i + 1,
- ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ server.stop();
}
-
- session.close();
-
- server.stop();
}
public void testProducerCreditsCaching6() throws Exception
@@ -744,26 +814,35 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ session.createQueue("address", "queue1", null, false);
- prod.send("address", session.createMessage(false));
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod.send("address", session.createMessage(false));
+
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
+
+ session.close();
}
+ finally
+ {
+ server.stop();
+ }
- session.close();
-
- server.stop();
}
public void testProducerCreditsCaching7() throws Exception
@@ -771,50 +850,58 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ session.createQueue("address", "queue1", null, false);
- prod.send("address" + i, session.createMessage(false));
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address" + i, session.createMessage(false));
- for (int i = 0; i < 10; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ Assert.assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
- prod.send("address" + i, session.createMessage(false));
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
- ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
- ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address" + i, session.createMessage(false));
- for (int i = 0; i < 10; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+ ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+ ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
- prod.send("address2-" + i, session.createMessage(false));
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
- ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
- ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address2-" + i, session.createMessage(false));
- session.close();
+ Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+ ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+ ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsRefCounting() throws Exception
@@ -822,43 +909,50 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducer prod1 = session.createProducer("address");
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ session.createQueue("address", "queue1", null, false);
- ClientProducer prod2 = session.createProducer("address");
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ ClientProducer prod1 = session.createProducer("address");
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- ClientProducer prod3 = session.createProducer("address");
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ ClientProducer prod2 = session.createProducer("address");
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- prod1.close();
+ ClientProducer prod3 = session.createProducer("address");
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod1.close();
- prod2.close();
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod2.close();
- prod3.close();
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod3.close();
- session.close();
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -444,6 +444,7 @@
// Now we will simulate a failure of the bridge connection between server0 and server1
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+ assertNotNull(bridge);
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = reconnectAttempts - 1;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -20,10 +20,15 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -132,7 +137,10 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
+ waitForServer(server1);
+
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -303,6 +311,7 @@
// Don't start server 1 yet
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -330,6 +339,8 @@
Thread.sleep(1000);
server1.start();
+ waitForServer(server1);
+
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session1 = sf1.createSession(false, true, true);
@@ -395,6 +406,7 @@
BridgeStartTest.log.info("sent some more messages");
server1.start();
+ waitForServer(server1);
BridgeStartTest.log.info("started server1");
@@ -514,6 +526,7 @@
// Don't start server 1 yet
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -542,6 +555,8 @@
// JMSBridge should be stopped since retries = 0
server1.start();
+ waitForServer(server1);
+
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session1 = sf1.createSession(false, true, true);
@@ -665,8 +680,10 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
+ waitForServer(server1);
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -434,7 +434,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(2000);
Assert.assertNotNull(message);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -73,7 +73,7 @@
*/
public abstract class ClusterTestBase extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ private final Logger log = Logger.getLogger(this.getClass());
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
@@ -88,12 +88,14 @@
private static final long WAIT_TIMEOUT = 10000;
- private static final long TIMEOUT_START_SERVER = 500;
+ private static final long TIMEOUT_START_SERVER = 10;
@Override
protected void setUp() throws Exception
{
super.setUp();
+
+ forceGC();
UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
@@ -116,9 +118,6 @@
locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
- // To make sure the test will start with a clean VM
- forceGC();
-
}
@Override
@@ -247,7 +246,7 @@
while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
String msg = "Timed out waiting for server starting = " + node;
- ClusterTestBase.log.error(msg);
+ log.error(msg);
throw new IllegalStateException(msg);
}
@@ -283,7 +282,7 @@
topology +
")";
- ClusterTestBase.log.error(msg);
+ log.error(msg);
throw new Exception(msg);
}
@@ -359,7 +358,7 @@
")" +
")";
- ClusterTestBase.log.error(msg);
+ log.error(msg);
Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
@@ -772,7 +771,7 @@
if (message == null)
{
- ClusterTestBase.log.info("*** dumping consumers:");
+ log.info("*** dumping consumers:");
dumpConsumers();
@@ -873,7 +872,7 @@
if (message == null)
{
- ClusterTestBase.log.info("*** dumping consumers:");
+ log.info("*** dumping consumers:");
dumpConsumers();
@@ -920,7 +919,7 @@
{
if (consumers[i] != null && !consumers[i].consumer.isClosed())
{
- ClusterTestBase.log.info("Dumping consumer " + i);
+ log.info("Dumping consumer " + i);
checkReceive(i);
}
@@ -984,13 +983,13 @@
if (message != null)
{
- ClusterTestBase.log.info("check receive Consumer " + consumerID +
+ log.info("check receive Consumer " + consumerID +
" received message " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
}
else
{
- ClusterTestBase.log.info("check receive Consumer " + consumerID + " null message");
+ log.info("check receive Consumer " + consumerID + " null message");
}
}
while (message != null);
@@ -2023,20 +2022,19 @@
for (int node : nodes)
{
log.info("#test start node " + node);
-// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-// {
-// Thread.sleep(TIMEOUT_START_SERVER);
-// }
- Thread.sleep(TIMEOUT_START_SERVER);
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+ {
+ Thread.sleep(TIMEOUT_START_SERVER);
+ }
timeStarts[node] = System.currentTimeMillis();
servers[node].setIdentity("server " + node);
- ClusterTestBase.log.info("starting server " + servers[node]);
+ log.info("starting server " + servers[node]);
servers[node].start();
- ClusterTestBase.log.info("started server " + servers[node]);
+ log.info("started server " + servers[node]);
- ClusterTestBase.log.info("started server " + node);
+ log.info("started server " + node);
waitForServer(servers[node]);
@@ -2053,6 +2051,7 @@
for (ClusterConnection cc : servers[node].getClusterManager().getClusterConnections())
{
cc.stop();
+ cc.flushExecutor();
}
}
}
@@ -2068,23 +2067,22 @@
{
try
{
-// if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-// {
-// // We can't stop and start a node too fast (faster than what the Topology could realize about this
-// Thread.sleep(TIMEOUT_START_SERVER);
-// }
+ if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+ {
+ // We can't stop and start a node too fast (faster than what the Topology could realize about this
+ Thread.sleep(TIMEOUT_START_SERVER);
+ }
- Thread.sleep(TIMEOUT_START_SERVER);
timeStarts[node] = System.currentTimeMillis();
- ClusterTestBase.log.info("stopping server " + node);
+ log.info("stopping server " + node);
servers[node].stop();
- ClusterTestBase.log.info("server " + node + " stopped");
+ log.info("server " + node + " stopped");
}
catch (Exception e)
{
- ClusterTestBase.log.warn(e.getMessage(), e);
+ log.warn(e.getMessage(), e);
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -19,10 +19,8 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -54,6 +54,11 @@
setupCluster();
startServers(0, 1, 2, 3, 4);
+
+ for (int i = 0 ; i < 5; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -97,6 +102,11 @@
setupCluster();
startServers(0, 1, 2, 3, 4);
+
+ for (int i = 0 ; i < 5; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -88,13 +88,6 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
- Thread.sleep(2000);
- System.out.println(clusterDescription(servers[0]));
- System.out.println(clusterDescription(servers[1]));
- System.out.println(clusterDescription(servers[2]));
- System.out.println(clusterDescription(servers[3]));
- System.out.println(clusterDescription(servers[4]));
-
waitForBindings(0, "queues.testaddress", 1, 1, false);
send(0, "queues.testaddress", 10, false, null);
@@ -323,8 +316,9 @@
stopServers(2);
- Thread.sleep(2000);
+ waitForTopology(servers[1], 4);
+ Thread.sleep(1000);
log.info("============================================ after stop");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -332,9 +326,12 @@
log.info(clusterDescription(servers[4]));
startServers(2);
+
- Thread.sleep(2000);
+ Thread.sleep(1000);
+ waitForTopology(servers[1], 5);
+
log.info("============================================ after start");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -358,7 +355,6 @@
setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty(), true);
startServers(0, 1, 2, 3, 4);
- Thread.sleep(2000);
Set<ClusterConnection> connectionSet = getServer(0).getClusterManager().getClusterConnections();
assertNotNull(connectionSet);
assertEquals(1, connectionSet.size());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -72,7 +72,7 @@
startServers(0);
// Give it a little time for the bridge to try to start
- Thread.sleep(2000);
+ Thread.sleep(500);
stopServers(0);
}
@@ -102,7 +102,11 @@
public void testStartSourceServerBeforeTargetServer() throws Exception
{
startServers(0, 1);
+
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
@@ -124,6 +128,13 @@
public void testStopAndStartTarget() throws Exception
{
startServers(0, 1);
+
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
+
+ System.out.println(servers[1].getClusterManager().getTopology().describe());
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
@@ -150,12 +161,14 @@
OnewayTwoNodeClusterTest.log.info("stopping server 1");
stopServers(1);
+
+ waitForTopology(servers[0], 1);
OnewayTwoNodeClusterTest.log.info("restarting server 1(" + servers[1].getIdentity() + ")");
startServers(1);
- //Thread.sleep(1000);
+ waitForTopology(servers[0], 2);
log.info("Server 1 id=" + servers[1].getNodeID());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -148,17 +148,22 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
startServers(0, 1, 2);
+
+ waitForTopology(servers[0], 3);
+ waitForTopology(servers[1], 3);
+ waitForTopology(servers[2], 3);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ System.out.println("top[" + i + "]=" + servers[i].getClusterManager().getTopology().describe());
+ }
- for (int i = 0; i < 10; i++)
- log.info("****************************");
for (int i = 0; i <= 2; i++)
{
log.info("*************************************\n " + servers[i] +
" topology:\n" +
servers[i].getClusterManager().getTopology().describe());
}
- for (int i = 0; i < 10; i++)
- log.info("****************************");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -180,19 +185,52 @@
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
+
+ public void testSimple_TwoNodes() throws Exception
+ {
+ setupServer(0, false, isNetty());
+ setupServer(1, false, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ for (int i = 0; i <= 1; i++)
+ {
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
+ }
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ closeAllConsumers();
+
+ }
+
static int loopNumber;
public void _testLoop() throws Throwable
{
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 10; i++)
{
loopNumber = i;
log.info("#test " + i);
- testSimple2();
- if (i + 1 < 1000)
- {
- tearDown();
- setUp();
- }
+ testSimple();
+ tearDown();
+ setUp();
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -1085,9 +1085,14 @@
public void testRouteWhenNoConsumersFalseNoLocalConsumerLoadBalancedQueues() throws Exception
{
setupCluster(false);
-
+
startServers();
+ for (int i = 0 ; i <= 4; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -1241,10 +1246,6 @@
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
- // this.checkReceive(0, 1, 2, 3, 4);
-
- // Thread.sleep(300000);
-
verifyReceiveAll(10, 0, 1, 2, 3, 4);
}
@@ -1470,7 +1471,6 @@
waitForBindings(3, "queues.testaddress", 6, 6, true);
waitForBindings(4, "queues.testaddress", 7, 7, true);
- Thread.sleep(2000);
System.out.println("#####################################");
System.out.println(clusterDescription(servers[0]));
System.out.println(clusterDescription(servers[1]));
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -128,7 +128,7 @@
stopServers(0, 1);
}
- public void testRestartTest() throws Throwable
+ public void testRestartServers() throws Throwable
{
String name = Thread.currentThread().getName();
try
@@ -136,14 +136,19 @@
Thread.currentThread().setName("ThreadOnTestRestartTest");
startServers(0, 1);
waitForTopology(servers[0], 2);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
+ System.out.println(servers[1].getClusterManager().getTopology().describe());
waitForTopology(servers[1], 2);
- for (int i = 0; i < 5; i++)
+ for (int i = 0; i < 10; i++)
{
+ Thread.sleep(10);
log.info("Sleep #test " + i);
log.info("#stop #test #" + i);
- Thread.sleep(500);
stopServers(1);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
startServers(1);
@@ -182,17 +187,17 @@
verifyNotReceive(0, 1);
removeConsumer(1);
-
+
closeSessionFactory(1);
-
+
stopServers(1);
-
+
Thread.sleep(12000);
System.out.println(clusterDescription(servers[0]));
startServers(1);
-
+
Thread.sleep(3000);
System.out.println(clusterDescription(servers[0]));
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -103,11 +103,14 @@
liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
liveServer.start();
+
+ waitForServer(liveServer.getServer());
if (backupServer != null)
{
backupServer.setIdentity(this.getClass().getSimpleName() + "/backupServer");
backupServer.start();
+ waitForServer(backupServer.getServer());
}
}
@@ -433,7 +436,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
@@ -447,7 +450,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
//To change body of implemented methods use File | Settings | File Templates.
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -34,6 +33,8 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -47,6 +48,7 @@
*/
public abstract class MultipleBackupsFailoverTestBase extends ServiceTestBase
{
+ Logger log = Logger.getLogger(this.getClass());
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -102,7 +104,7 @@
}
}
}
-
+
try
{
Thread.sleep(100);
@@ -170,6 +172,13 @@
protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
int topologyMembers) throws Exception
{
+ return createSessionFactoryAndWaitForTopology(locator, topologyMembers, null);
+ }
+
+ protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int topologyMembers,
+ HornetQServer server) throws Exception
+ {
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
@@ -179,12 +188,15 @@
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ locator.removeClusterTopologyListener(topListener);
if (!ok)
{
- System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ if (server != null)
+ {
+ log.info("failed topology, Topology on server = " + server.getClusterManager().getTopology().describe());
+ }
}
- locator.removeClusterTopologyListener(topListener);
- assertTrue(ok);
+ assertTrue("expected " + topologyMembers + " members", ok);
return sf;
}
@@ -219,7 +231,10 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
@@ -233,7 +248,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -25,7 +25,6 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -42,7 +41,7 @@
{
for (TestableServer testableServer : servers.values())
{
- if(testableServer != null)
+ if (testableServer != null)
{
try
{
@@ -56,43 +55,51 @@
}
super.tearDown();
}
-
+
public void testMultipleFailovers2LiveServers() throws Exception
{
- // TODO: remove these sleeps
NodeManager nodeManager1 = new InVMNodeManager();
NodeManager nodeManager2 = new InVMNodeManager();
createLiveConfig(nodeManager1, 0, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 1, true, new int[] {0, 2}, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 2, true, new int[] {0, 1}, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 1, true, new int[] { 0, 2 }, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 2, true, new int[] { 0, 1 }, 3, 4, 5);
createLiveConfig(nodeManager2, 3, 0);
- createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
- createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
-
- Thread.sleep(500);
+ createBackupConfig(nodeManager2, 3, 4, true, new int[] { 3, 5 }, 0, 1, 2);
+ createBackupConfig(nodeManager2, 3, 5, true, new int[] { 3, 4 }, 0, 1, 2);
+
servers.get(0).start();
- Thread.sleep(500);
+ waitForServer(servers.get(0).getServer());
+
servers.get(3).start();
- Thread.sleep(500);
+ waitForServer(servers.get(3).getServer());
+
servers.get(1).start();
- Thread.sleep(500);
+ waitForServer(servers.get(1).getServer());
+
servers.get(2).start();
- Thread.sleep(500);
+
servers.get(4).start();
- Thread.sleep(500);
+ waitForServer(servers.get(4).getServer());
+
servers.get(5).start();
+
+ waitForServer(servers.get(4).getServer());
+
ServerLocator locator = getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4, servers.get(0).getServer());
ClientSession session = sendAndConsume(sf, true);
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ Thread.sleep(500);
servers.get(0).crash(session);
+ System.out.println("server3 " + servers.get(3).getServer().getClusterManager().getTopology().describe());
+
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
@@ -139,11 +146,18 @@
}
}
- protected void createBackupConfig(NodeManager nodeManager, int liveNode, int nodeid, boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
+ protected void createBackupConfig(NodeManager nodeManager,
+ int liveNode,
+ int nodeid,
+ boolean createClusterConnections,
+ int[] otherBackupNodes,
+ int... otherClusterNodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty())));
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+ generateParams(nodeid, isNetty())));
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
@@ -152,21 +166,36 @@
List<String> staticConnectors = new ArrayList<String>();
for (int node : otherBackupNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
staticConnectors.add(liveConnector.getName());
}
- TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty()));
+ TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(nodeid, isNetty()));
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
List<String> clusterNodes = new ArrayList<String>();
for (int node : otherClusterNodes)
{
- TransportConfiguration connector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration connector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(connector.getName(), connector);
clusterNodes.add(connector.getName());
}
- ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1, clusterNodes, false);
+ ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1",
+ "jms",
+ backupConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ clusterNodes,
+ false);
config1.getClusterConfigurations().add(ccc1);
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
@@ -177,25 +206,39 @@
servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, liveNode)));
}
- protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ... otherLiveNodes)
+ protected void createLiveConfig(NodeManager nodeManager, int liveNode, int... otherLiveNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty())));
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+ generateParams(liveNode, isNetty())));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
config0.setClustered(true);
List<String> pairs = new ArrayList<String>();
for (int node : otherLiveNodes)
{
- TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(node, isNetty()));
config0.getConnectorConfigurations().put(otherLiveConnector.getName(), otherLiveConnector);
- pairs.add(otherLiveConnector.getName());
+ pairs.add(otherLiveConnector.getName());
}
- ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
- pairs, false);
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1",
+ "jms",
+ liveConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ pairs,
+ false);
config0.getClusterConfigurations().add(ccc0);
config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -204,7 +247,8 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
+ servers.put(liveNode,
+ new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
}
protected boolean isNetty()
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -204,7 +204,8 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -222,7 +223,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -278,7 +279,8 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -289,7 +291,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -350,7 +352,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -361,7 +363,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
@@ -432,7 +434,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -443,7 +445,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
if (nodes.contains(nodeID))
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -12,9 +12,6 @@
*/
package org.hornetq.tests.integration.jms.bridge;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
@@ -31,6 +28,11 @@
*/
public class JMSBridgeReconnectionTest extends BridgeTestBase
{
+ /**
+ *
+ */
+ private static final int TIME_WAIT = 5000;
+
private static final Logger log = Logger.getLogger(JMSBridgeReconnectionTest.class);
// Crash and reconnect
@@ -175,8 +177,6 @@
bridge.stop();
Assert.assertFalse(bridge.isStarted());
-
- // Thread.sleep(3000);
// we restart and setup the server for the test's tearDown checks
jmsServer1.start();
@@ -245,7 +245,7 @@
// Wait a while before starting up to simulate the dest being down for a while
JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(10000);
+ Thread.sleep(TIME_WAIT);
JMSBridgeReconnectionTest.log.info("Done wait");
// Restart the server
@@ -337,7 +337,7 @@
// Wait a while before starting up to simulate the dest being down for a while
JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(10000);
+ Thread.sleep(TIME_WAIT);
JMSBridgeReconnectionTest.log.info("Done wait");
// Restart the server
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -296,7 +296,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID,
+ public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
@@ -312,7 +312,7 @@
}
}
- public void nodeDown(String nodeID)
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
// To change body of implemented methods use File | Settings | File Templates.
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -2,6 +2,7 @@
import junit.framework.Assert;
+import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.hornetq.tests.util.UnitTestCase;
@@ -15,6 +16,16 @@
*/
public class SpringIntegrationTest extends UnitTestCase
{
+ Logger log = Logger.getLogger(SpringIntegrationTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ // Need to force GC as the connection on the spring needs to be cleared
+ // otherwise the sprint thread may leak here
+ forceGC();
+ }
+
public void testSpring() throws Exception
{
System.out.println("Creating bean factory...");
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -113,9 +113,11 @@
jmsServer1.start();
jmsServer1.activated();
+ waitForServer(jmsServer1.getHornetQServer());
jmsServer2.start();
jmsServer2.activated();
+ waitForServer(jmsServer2.getHornetQServer());
cf1 = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(),
generateInVMParams(0)));
@@ -228,6 +230,8 @@
log.warn("Can't stop server2", e);
}
+ Thread.sleep(500);
+
((HornetQConnectionFactory)cf1).close();
((HornetQConnectionFactory)cf2).close();
@@ -250,9 +254,9 @@
}
catch (Throwable e)
{
- log.warn("Can't stop server2", e);
+ log.warn("Can't stop server1", e);
}
-
+
server1 = null;
jmsServer1 = null;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -122,6 +122,7 @@
Configuration conf = createDefaultConfig(false);
+ conf.getAcceptorConfigurations().clear();
conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
conf.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
@@ -181,9 +182,6 @@
mbeanServer = null;
super.tearDown();
-
-
- super.tearDown();
}
// Private -------------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-09 18:38:31 UTC (rev 11313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-09 20:31:28 UTC (rev 11314)
@@ -571,7 +571,7 @@
}
catch (Exception e)
{
- throw new IllegalStateException("port " + port + " is already bound");
+ throw new IllegalStateException("port " + port + " is bound");
}
finally
{
@@ -967,6 +967,8 @@
logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" +
this.getName() + "\n" + buffer.toString());
logAndSystemOut("Thread leakage");
+
+ fail("Thread leaked");
}
super.tearDown();
13 years, 3 months
JBoss hornetq SVN: r11313 - branches/Branch_2_2_EAP/hornetq-rest.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 14:38:31 -0400 (Fri, 09 Sep 2011)
New Revision: 11313
Modified:
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
Log:
Fixing POM at rest
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-09-09 15:56:27 UTC (rev 11312)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-09-09 18:38:31 UTC (rev 11313)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.3.GA</hornetq.version>
+ <hornetq.version>2.2.8.CR1</hornetq.version>
</properties>
<licenses>
13 years, 3 months
JBoss hornetq SVN: r11312 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 11:56:27 -0400 (Fri, 09 Sep 2011)
New Revision: 11312
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
HORNETQ-765 - Fix on paging / depaging with GC in place
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -47,6 +47,8 @@
private Long deliveryTime = null;
private int persistedCount;
+
+ private int messageEstimate;
private AtomicInteger deliveryCount = new AtomicInteger(0);
@@ -84,6 +86,7 @@
final PageSubscription subscription)
{
this.position = position;
+ this.messageEstimate = message.getMessage().getMemoryEstimate();
this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -102,8 +105,18 @@
{
return persistedCount;
}
+
/* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return messageEstimate;
+ }
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
*/
public MessageReference copy(final Queue queue)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -27,4 +27,6 @@
HandleStatus handle(MessageReference reference) throws Exception;
Filter getFilter();
+
+ String debug();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -32,6 +32,13 @@
boolean isPaged();
ServerMessage getMessage();
+
+ /**
+ * We define this method aggregation here because on paging we need to hold the original estimate,
+ * so we need to perform some extra steps on paging.
+ * @return
+ */
+ int getMessageMemoryEstimate();
MessageReference copy(Queue queue);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -222,6 +222,11 @@
notificationService.sendNotification(notification);
}
}
+
+ public String debug()
+ {
+ return toString();
+ }
private void cancelRefs()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -76,6 +76,11 @@
{
return null;
}
+
+ public String debug()
+ {
+ return toString();
+ }
public synchronized void start()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -260,5 +260,13 @@
{
ref.getQueue().acknowledge(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return ref.getMessage().getMemoryEstimate();
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -186,7 +186,17 @@
{
queue.acknowledge(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return message.getMemoryEstimate();
+ }
+
+
// Public --------------------------------------------------------
@Override
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.impl;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -75,7 +77,7 @@
public class QueueImpl implements Queue
{
private static final Logger log = Logger.getLogger(QueueImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -107,15 +109,20 @@
private final LinkedListIterator<PagedReference> pageIterator;
- private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
+ // Messages will first enter intermediateMessageReferences
+ // Before they are added to messageReferences
+ // This is to avoid locking the queue on the producer
+ private final ConcurrentLinkedQueue<MessageReference> intermediateMessageReferences = new ConcurrentLinkedQueue<MessageReference>();
+ // This is where messages are stored
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
- // The quantity of pagedReferences on messageREferences priority list
+ // The quantity of pagedReferences on messageReferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
+ private final AtomicInteger queueInstances = new AtomicInteger(0);
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
@@ -170,6 +177,48 @@
private volatile boolean checkDirect;
private volatile boolean directDeliver = true;
+
+ public String debug()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("queueMemorySize=" + queueMemorySize);
+
+ for (ConsumerHolder holder : consumerList)
+ {
+ out.println("consumer: " + holder.consumer.debug());
+ }
+
+ for (MessageReference reference : intermediateMessageReferences)
+ {
+ out.print("Intermediate reference:" + reference);
+ }
+
+ if (intermediateMessageReferences.isEmpty())
+ {
+ out.println("No intermediate references");
+ }
+
+ boolean foundRef = false;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ foundRef = true;
+ out.println("reference = " + iter.next());
+ }
+
+ if (!foundRef)
+ {
+ out.println("No permanent references on queue");
+ }
+
+
+
+ System.out.println(str.toString());
+
+ return str.toString();
+ }
public QueueImpl(final long id,
final SimpleString address,
@@ -342,7 +391,7 @@
public synchronized void reload(final MessageReference ref)
{
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
if (!scheduledDeliveryHandler.checkAndSchedule(ref, true))
{
internalAddTail(ref);
@@ -376,7 +425,7 @@
if (checkDirect)
{
if (direct && !directDeliver &&
- concurrentQueue.isEmpty() &&
+ intermediateMessageReferences.isEmpty() &&
messageReferences.isEmpty() &&
!pageIterator.hasNext() &&
!pageSubscription.isPaging())
@@ -397,9 +446,10 @@
return;
}
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ // We only add queueMemorySize if not being delivered directly
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
- concurrentQueue.add(ref);
+ intermediateMessageReferences.add(ref);
directDeliver = false;
@@ -498,6 +548,11 @@
public synchronized void addConsumer(final Consumer consumer) throws Exception
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " adding consumer " + consumer);
+ }
+
cancelRedistributor();
if (consumer.getFilter() != null)
@@ -1534,14 +1589,14 @@
*/
private void internalAddHead(final MessageReference ref)
{
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
private synchronized void doPoll()
{
- MessageReference ref = concurrentQueue.poll();
+ MessageReference ref = intermediateMessageReferences.poll();
if (ref != null)
{
@@ -1564,6 +1619,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+ }
int busyCount = 0;
@@ -1698,6 +1758,10 @@
if (nullRefCount + busyCount == size)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up now");
+ }
break;
}
@@ -1723,7 +1787,7 @@
*/
private void refRemoved(MessageReference ref)
{
- queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
if (ref.isPaged())
{
pagedReferences.decrementAndGet();
@@ -1773,6 +1837,8 @@
log.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
}
+ this.directDeliver = false;
+
int depaged = 0;
while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
@@ -1786,14 +1852,26 @@
pageIterator.remove();
}
- if (isTrace)
+ if (log.isDebugEnabled())
{
if (depaged == 0 && queueMemorySize.get() >= maxSize)
{
- log.trace("Couldn't depage any message as the maxSize on the queue was achieved. There are too many pending messages to be acked in reference to the page configuration");
+ log.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
}
-
- log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Queue Memory Size after depage on queue=" + this.getName() +
+ " is " +
+ queueMemorySize.get() +
+ " with maxSize = " +
+ maxSize +
+ ". Depaged " +
+ depaged +
+ " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() +
+ ", queueDelivering=" + deliveringCount.get());
+
+ }
}
deliverAsync();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -66,13 +66,6 @@
// Static ---------------------------------------------------------------------------------------
- private static final boolean trace = ServerConsumerImpl.log.isTraceEnabled();
-
- private static void trace(final String message)
- {
- ServerConsumerImpl.log.trace(message);
- }
-
// Attributes -----------------------------------------------------------------------------------
private final long id;
@@ -92,6 +85,11 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
+
+ public String debug()
+ {
+ return toString() + "::Delivering " + this.deliveringRefs.size();
+ }
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -212,6 +210,11 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isDebugEnabled() )
+ {
+ log.debug(this + " is busy for the lack of credits!!!");
+ }
+
return HandleStatus.BUSY;
}
@@ -507,9 +510,9 @@
{
int previous = availableCredits.getAndAdd(credits);
- if (ServerConsumerImpl.trace)
+ if (log.isDebugEnabled())
{
- ServerConsumerImpl.trace("Received " + credits +
+ log.debug(this + "::Received " + credits +
" credits, previous value = " +
previous +
" currentValue = " +
@@ -518,6 +521,10 @@
if (previous <= 0 && previous + credits > 0)
{
+ if (log.isTraceEnabled() )
+ {
+ log.trace(this + "::calling promptDelivery from receiving credits");
+ }
promptDelivery();
}
}
@@ -793,9 +800,9 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
}
return false;
@@ -818,9 +825,9 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Sending " + packetSize +
+ log.trace("deliverLargeMessage: Sending " + packetSize +
" availableCredits now is " +
availableCredits);
}
@@ -840,9 +847,9 @@
}
}
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("Finished deliverLargeMessage");
+ log.trace("Finished deliverLargeMessage");
}
finish();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -54,6 +54,12 @@
private Filter filter = null;
private boolean isStarted = false;
+
+
+ public String debug()
+ {
+ return toString();
+ }
public OutgoingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -0,0 +1,506 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A MultipleConsumersPageStressTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class MultipleConsumersPageStressTest extends ServiceTestBase
+{
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final static int TIME_TO_RUN = 60 * 1000;
+
+ private static final SimpleString ADDRESS = new SimpleString("page-adr");
+
+ private int numberOfProducers;
+
+ private int numberOfConsumers;
+
+ private QueueImpl pagedServerQueue;
+
+ private boolean shareConnectionFactory = true;
+
+ private boolean openConsumerOnEveryLoop = true;
+
+ private HornetQServer messagingService;
+
+ private ServerLocator sharedLocator;
+
+ private ClientSessionFactory sharedSf;
+
+ final AtomicInteger messagesAvailable = new AtomicInteger(0);
+
+ private volatile boolean runningProducer = true;
+
+ private volatile boolean runningConsumer = true;
+
+ ArrayList<TestProducer> producers = new ArrayList<TestProducer>();
+
+ ArrayList<TestConsumer> consumers = new ArrayList<TestConsumer>();
+
+ ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+ public void testOpenConsumerEveryTimeDefaultFlowControl0() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = true;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+ sharedLocator.setConsumerWindowSize(0);
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ internalMultipleConsumers();
+ }
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Configuration config = createDefaultConfig();
+
+ HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+
+ // messagingService = createServer(true, config, 10024, 20024, settings);
+ messagingService = createServer(true, config, 10024, 200024, settings);
+ messagingService.start();
+
+ pagedServerQueue = (QueueImpl)messagingService.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ for (Tester tst : producers)
+ {
+ tst.close();
+ }
+ for (Tester tst : consumers)
+ {
+ tst.close();
+ }
+ sharedSf.close();
+ sharedLocator.close();
+ messagingService.stop();
+ super.tearDown();
+ }
+
+ public void testOpenConsumerEveryTimeDefaultFlowControl() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = true;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ System.out.println(pagedServerQueue.debug());
+
+ internalMultipleConsumers();
+
+ }
+
+ public void testReuseConsumersFlowControl0() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = false;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+ sharedLocator.setConsumerWindowSize(0);
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ try
+ {
+ internalMultipleConsumers();
+ }
+ catch (Throwable e)
+ {
+ TestConsumer tstConsumer = consumers.get(0);
+ System.out.println("first retry: " + tstConsumer.consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+ pagedServerQueue.forceDelivery();
+ System.out.println("Second retry: " + tstConsumer.consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+
+ tstConsumer.session.commit();
+ System.out.println("Third retry:" + tstConsumer.consumer.receive(1000));
+
+ tstConsumer.close();
+
+ ClientSession session = sharedSf.createSession();
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ pagedServerQueue.forceDelivery();
+
+ System.out.println("Fourth retry: " + consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+ throw e;
+ }
+
+ }
+
+ public void internalMultipleConsumers() throws Throwable
+ {
+ for (int i = 0; i < numberOfProducers; i++)
+ {
+ producers.add(new TestProducer());
+ }
+
+ for (int i = 0; i < numberOfConsumers; i++)
+ {
+ consumers.add(new TestConsumer());
+ }
+
+ for (Tester test : producers)
+ {
+ test.start();
+ }
+
+ Thread.sleep(2000);
+
+ for (Tester test : consumers)
+ {
+ test.start();
+ }
+
+ for (Tester test : consumers)
+ {
+ test.join();
+ }
+
+ runningProducer = false;
+
+ for (Tester test : producers)
+ {
+ test.join();
+ }
+
+ for (Throwable e : exceptions)
+ {
+ throw e;
+ }
+
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ abstract class Tester extends Thread
+ {
+ Random random = new Random();
+
+ public abstract void close();
+
+ protected abstract boolean enabled();
+
+ protected void exceptionHappened(final Throwable e)
+ {
+ runningConsumer = false;
+ runningProducer = false;
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+
+ public int getNumberOfMessages() throws Exception
+ {
+ int numberOfMessages = random.nextInt(20);
+ if (numberOfMessages <= 0)
+ {
+ return 1;
+ }
+ else
+ {
+ return numberOfMessages;
+ }
+ }
+ }
+
+ class TestConsumer extends Tester
+ {
+
+ public ClientConsumer consumer = null;
+
+ public ClientSession session = null;
+
+ public ServerLocator locator = null;
+
+ public ClientSessionFactory sf = null;
+
+ @Override
+ public void close()
+ {
+ try
+ {
+
+ if (!openConsumerOnEveryLoop)
+ {
+ consumer.close();
+ }
+ session.rollback();
+ session.close();
+
+ if (!shareConnectionFactory)
+ {
+ sf.close();
+ locator.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ @Override
+ protected boolean enabled()
+ {
+ return runningConsumer;
+ }
+
+ @Override
+ public int getNumberOfMessages() throws Exception
+ {
+ while (enabled())
+ {
+ int numberOfMessages = super.getNumberOfMessages();
+
+ int resultMessages = messagesAvailable.addAndGet(-numberOfMessages);
+
+ if (resultMessages < 0)
+ {
+ messagesAvailable.addAndGet(-numberOfMessages);
+ numberOfMessages = 0;
+ System.out.println("Negative, giving a little wait");
+ Thread.sleep(1000);
+ }
+
+ if (numberOfMessages > 0)
+ {
+ return numberOfMessages;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (shareConnectionFactory)
+ {
+ session = sharedSf.createSession(false, false);
+ }
+ else
+ {
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ }
+
+ long timeOut = System.currentTimeMillis() + MultipleConsumersPageStressTest.TIME_TO_RUN;
+
+ session.start();
+
+ if (!openConsumerOnEveryLoop)
+ {
+ consumer = session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+ }
+
+ int count = 0;
+
+ while (enabled() && timeOut > System.currentTimeMillis())
+ {
+
+ if (openConsumerOnEveryLoop)
+ {
+ consumer = session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+ }
+
+ int numberOfMessages = getNumberOfMessages();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+ if (msg == null)
+ {
+ log.warn("msg " + count +
+ " was null, currentBatchSize=" +
+ numberOfMessages +
+ ", current msg being read=" +
+ i);
+ }
+ Assert.assertNotNull("msg " + count +
+ " was null, currentBatchSize=" +
+ numberOfMessages +
+ ", current msg being read=" +
+ i, msg);
+
+ if (numberOfConsumers == 1 && numberOfProducers == 1)
+ {
+ Assert.assertEquals(count, msg.getIntProperty("count").intValue());
+ }
+
+ count++;
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ if (openConsumerOnEveryLoop)
+ {
+ consumer.close();
+ }
+
+ }
+ }
+ catch (Throwable e)
+ {
+ exceptionHappened(e);
+ }
+
+ }
+ }
+
+ class TestProducer extends Tester
+ {
+ ClientSession session = null;
+
+ ClientSessionFactory sf = null;
+
+ ServerLocator locator = null;
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ session.rollback();
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ @Override
+ protected boolean enabled()
+ {
+ return runningProducer;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (shareConnectionFactory)
+ {
+ session = sharedSf.createSession(false, false);
+ }
+ else
+ {
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ }
+
+ ClientProducer prod = session.createProducer(MultipleConsumersPageStressTest.ADDRESS);
+
+ int count = 0;
+
+ while (enabled())
+ {
+ int numberOfMessages = getNumberOfMessages();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putStringProperty("Test", "This is a simple test");
+ msg.putIntProperty("count", count++);
+ prod.send(msg);
+ }
+
+ messagesAvailable.addAndGet(numberOfMessages);
+ session.commit();
+ }
+ }
+ catch (Throwable e)
+ {
+ exceptionHappened(e);
+ }
+ }
+ }
+
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java 2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java 2011-09-09 15:56:27 UTC (rev 11312)
@@ -55,6 +55,11 @@
return filter;
}
+ public String debug()
+ {
+ return toString();
+ }
+
public synchronized MessageReference waitForNextReference(long timeout)
{
while (references.isEmpty() && timeout > 0)
13 years, 3 months
JBoss hornetq SVN: r11311 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-09 11:07:29 -0400 (Fri, 09 Sep 2011)
New Revision: 11311
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Remove cast to JournalImpl and add necessary methods to Journal
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-09 12:43:32 UTC (rev 11310)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-09 15:07:29 UTC (rev 11311)
@@ -368,8 +368,8 @@
JournalFile[] messageFiles = null;
JournalFile[] bindingsFiles = null;
- final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
- final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+ final Journal localMessageJournal = messageJournal;
+ final Journal localBindingsJournal = bindingsJournal;
final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
@@ -462,9 +462,7 @@
for (SimpleString storeName : pagingManager.getStoreNames())
{
PagingStore store = pagingManager.getPageStore(storeName);
- List<Integer> ids = new ArrayList<Integer>();
info.put(storeName, store.getCurrentIds());
- // HORNETQ-720 XXX perhaps before? unnecessary?
store.forceAnotherPage();
}
return info;
@@ -521,7 +519,7 @@
}
}
- private JournalFile[] prepareJournalForCopy(JournalImpl journal, JournalContent contentType) throws Exception
+ private JournalFile[] prepareJournalForCopy(Journal journal, JournalContent contentType) throws Exception
{
journal.setAutoReclaim(false);
/*
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-09 12:43:32 UTC (rev 11310)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-09 15:07:29 UTC (rev 11311)
@@ -584,6 +584,42 @@
throw new UnsupportedOperationException("This method should only be called at a replicating backup");
}
+ @Override
+ public boolean getAutoReclaim()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeUnlock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAutoReclaim(boolean autoReclaim)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void forceMoveNextFile()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalFile[] getDataFiles()
+ {
+ throw new UnsupportedOperationException();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-09 12:43:32 UTC (rev 11310)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-09 15:07:29 UTC (rev 11311)
@@ -156,4 +156,38 @@
*/
JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception;
+ /**
+ * @return whether automatic reclaiming of Journal files is enabled
+ */
+ boolean getAutoReclaim();
+
+ /**
+ * Write lock the Journal. Necessary only during replication for backup synchronization.
+ */
+ void writeLock();
+
+ /**
+ * Write-unlock the Journal.
+ * @see Journal#writeLock()
+ */
+ void writeUnlock();
+
+ /**
+ * Sets whether the journal should auto-reclaim its internal files.
+ * @param autoReclaim
+ */
+ void setAutoReclaim(boolean autoReclaim);
+
+ /**
+ * Force the usage of a new {@link JournalFile}.
+ * @throws Exception
+ */
+ void forceMoveNextFile() throws Exception;
+
+ /**
+ * Returns the {@link JournalFile}s in use.
+ * @return
+ */
+ JournalFile[] getDataFiles();
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-09-09 12:43:32 UTC (rev 11310)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-09-09 15:07:29 UTC (rev 11311)
@@ -47,10 +47,6 @@
int getMaxAIO();
- void forceMoveNextFile() throws Exception;
-
- void setAutoReclaim(boolean autoReclaim);
-
void testCompact() throws Exception;
JournalFile getCurrentFile();
@@ -58,6 +54,4 @@
/** This method is called automatically when a new file is opened.
* @return true if it needs to re-check due to cleanup or other factors */
boolean checkReclaimStatus() throws Exception;
-
- JournalFile[] getDataFiles();
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-09 12:43:32 UTC (rev 11310)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-09 15:07:29 UTC (rev 11311)
@@ -264,4 +264,40 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean getAutoReclaim()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeUnlock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAutoReclaim(boolean autoReclaim)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void forceMoveNextFile()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalFile[] getDataFiles()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-09 12:43:32 UTC (rev 11310)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-09 15:07:29 UTC (rev 11311)
@@ -838,29 +838,61 @@
void
appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
{
- // TODO Auto-generated method stub
}
public void lineUpContex(IOCompletion callback)
{
- // TODO Auto-generated method stub
}
@Override
public JournalLoadInformation loadSyncOnly() throws Exception
{
- // TODO Auto-generated method stub
return null;
}
@Override
public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
{
- // TODO Auto-generated method stub
return null;
}
+ @Override
+ public boolean getAutoReclaim()
+ {
+ return false;
+ }
+
+ @Override
+ public void writeLock()
+ {
+
+ }
+
+ @Override
+ public void writeUnlock()
+ {
+
+ }
+
+ @Override
+ public void setAutoReclaim(boolean autoReclaim)
+ {
+
+ }
+
+ @Override
+ public void forceMoveNextFile() throws Exception
+ {
+
+ }
+
+ @Override
+ public JournalFile[] getDataFiles()
+ {
+ return null;
+ }
+
}
}
13 years, 3 months