JBoss hornetq SVN: r11400 - in trunk/tests: unit-tests/src/test/java/org/hornetq/tests/unit/util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-23 08:21:30 -0400 (Fri, 23 Sep 2011)
New Revision: 11400
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java
Log:
Don't leave stuff open after running the test
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2011-09-23 03:24:21 UTC (rev 11399)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2011-09-23 12:21:30 UTC (rev 11400)
@@ -149,10 +149,10 @@
class TestHandler implements MessageHandler
{
- private CountDownLatch latch;
+ private final CountDownLatch latch;
int count = 0;
- private ClientSession clientSession;
+ private final ClientSession clientSession;
public TestHandler(CountDownLatch latch, ClientSession clientSession)
{
@@ -251,10 +251,12 @@
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(dla, dlq, null, false);
clientSession.createQueue(qName, qName, null, false);
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
- ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ final ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ final ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession sendSession = sessionFactory.createSession(false, true, true);
- ClientProducer producer = sendSession.createProducer(qName);
+ try
+ {
+ ClientProducer producer = sendSession.createProducer(qName);
Map<String, Long> origIds = new HashMap<String, Long>();
for (int i = 0; i < NUM_MESSAGES; i++)
@@ -320,7 +322,10 @@
}
sendSession.close();
-
+ } finally {
+ sessionFactory.close();
+ locator.close();
+ }
}
public void testDeadlLetterAddressWithDefaultAddressSettings() throws Exception
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2011-09-23 03:24:21 UTC (rev 11399)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2011-09-23 12:21:30 UTC (rev 11400)
@@ -26,7 +26,7 @@
import org.hornetq.tests.util.UnitTestCase;
/**
- *
+ *
* A SessionCloseOnGCTest
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
@@ -57,7 +57,7 @@
protected void tearDown() throws Exception
{
//locator.close();
-
+
server.stop();
server = null;
@@ -269,12 +269,13 @@
public void testCloseOneSessionOnGC() throws Exception
{
- ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
+ final ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)locator.createSessionFactory();
- ClientSession session = sf.createSession(false, true, true);
+ try
+ {
+ ClientSession session = sf.createSession(false, true, true);
+ WeakReference<ClientSession> wses = new WeakReference<ClientSession>(session);
- WeakReference<ClientSession> wses = new WeakReference<ClientSession>(session);
-
Assert.assertEquals(1, server.getRemotingService().getConnections().size());
session = null;
@@ -284,12 +285,18 @@
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(1, sf.numConnections());
Assert.assertEquals(1, server.getRemotingService().getConnections().size());
+ }
+ finally
+ {
+ sf.close();
+ }
}
public void testCloseSeveralSessionOnGC() throws Exception
{
- ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
-
+ final ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)locator.createSessionFactory();
+ try
+ {
ClientSession session1 = sf.createSession(false, true, true);
ClientSession session2 = sf.createSession(false, true, true);
ClientSession session3 = sf.createSession(false, true, true);
@@ -309,6 +316,11 @@
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(1, sf.numConnections());
Assert.assertEquals(1, server.getRemotingService().getConnections().size());
+
+ }
+ finally
+ {
+ sf.close();
+ }
}
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java 2011-09-23 03:24:21 UTC (rev 11399)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java 2011-09-23 12:21:30 UTC (rev 11400)
@@ -21,11 +21,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.tests.util.RandomUtil;
@@ -49,6 +52,8 @@
private ClientSessionFactory sf;
+ private ServerLocator locator;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -252,7 +257,9 @@
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator =
+ HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
+ UnitTestCase.INVM_CONNECTOR_FACTORY));
sf = locator.createSessionFactory();
}
@@ -270,8 +277,13 @@
server.stop();
}
+ if (locator != null)
+ {
+ locator.close();
+ }
+
+ locator = null;
sf = null;
-
server = null;
super.tearDown();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-09-23 03:24:21 UTC (rev 11399)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-09-23 12:21:30 UTC (rev 11400)
@@ -38,7 +38,7 @@
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A ClientSessionFactoryTest
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@@ -50,7 +50,7 @@
{
private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
- private DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(getUDPDiscoveryAddress(), getUDPDiscoveryPort());
+ private final DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(getUDPDiscoveryAddress(), getUDPDiscoveryPort());
private HornetQServer liveService;
@@ -101,16 +101,22 @@
Assert.assertNotNull(csi);
csi.close();
-
+
locator.close();
}
public void testCloseUnusedClientSessionFactoryWithoutGlobalPools() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(liveTC);
-
- ClientSessionFactory csf = locator.createSessionFactory();
- csf.close();
+ try
+ {
+ ClientSessionFactory csf = locator.createSessionFactory();
+ csf.close();
+ }
+ finally
+ {
+ locator.close();
+ }
}
public void testDiscoveryConstructor() throws Exception
@@ -141,15 +147,15 @@
HornetQClient.DEFAULT_RETRY_INTERVAL,
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
HornetQClient.DEFAULT_RECONNECT_ATTEMPTS);
-
+
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession(false, true, true);
Assert.assertNotNull(session);
session.close();
testSettersThrowException(cf);
-
+
cf.close();
-
+
locator.close();
}
@@ -182,13 +188,13 @@
HornetQClient.DEFAULT_RETRY_INTERVAL,
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
HornetQClient.DEFAULT_RECONNECT_ATTEMPTS);
-
+
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession(false, true, true);
Assert.assertNotNull(session);
session.close();
testSettersThrowException(cf);
-
+
cf.close();
}
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java 2011-09-23 03:24:21 UTC (rev 11399)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java 2011-09-23 12:21:30 UTC (rev 11400)
@@ -57,6 +57,7 @@
payload = new byte[10 * 1024];
}
+ @Override
protected void finalize() throws Exception
{
count.decrementAndGet();
@@ -112,10 +113,8 @@
assertEquals(1000, count.get());
- int removed = 0;
while (iter.hasNext())
{
- System.out.println("removed " + (removed++));
iter.next();
iter.remove();
}
13 years, 3 months
JBoss hornetq SVN: r11399 - branches.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-22 23:24:21 -0400 (Thu, 22 Sep 2011)
New Revision: 11399
Added:
branches/Branch_2_2_EAP_hq782/
Log:
for HORNETQ-782
13 years, 3 months
JBoss hornetq SVN: r11398 - branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-09-22 16:11:24 -0400 (Thu, 22 Sep 2011)
New Revision: 11398
Modified:
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/Util.java
Log:
Adjust ResourceAdapter for discovery changes
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2011-09-22 19:24:22 UTC (rev 11397)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2011-09-22 20:11:24 UTC (rev 11398)
@@ -100,6 +100,8 @@
private Integer threadPoolMaxSize;
+ private Map<String, Object> discoveryPluginParameters;
+
/**
* @return the transportType
*/
@@ -119,6 +121,18 @@
hasBeenUpdated = true;
}
+ public Map<String, Object> getParsedDiscoveryPluginParameters()
+ {
+ return discoveryPluginParameters;
+ }
+
+ public void setParsedDiscoveryPluginParameters(final Map<String, Object> discoveryPluginParameters)
+ {
+ this.discoveryPluginParameters = discoveryPluginParameters;
+ hasBeenUpdated = true;
+ }
+
+
public void setParsedConnectorClassNames(final List<String> value)
{
connectorClassName = value;
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-22 19:24:22 UTC (rev 11397)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-22 20:11:24 UTC (rev 11398)
@@ -32,6 +32,7 @@
import javax.transaction.xa.XAResource;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -39,6 +40,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.SimpleUDPServerLocatorImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.ra.inflow.HornetQActivation;
@@ -91,6 +93,11 @@
private String unparsedConnectors;
/**
+ * The discovery plugin properties for resource adapter before parsing
+ */
+ private String unparsedDiscoveryPluginProperties;
+
+ /**
* Have the factory been configured
*/
private final AtomicBoolean configured;
@@ -287,6 +294,20 @@
this.raProperties.setHA(ha);
}
+ public String getDiscoveryPluginParameters()
+ {
+ return unparsedDiscoveryPluginProperties;
+ }
+
+ public void setDiscoveryPluginProperties(final String config)
+ {
+ if(config != null)
+ {
+ this.unparsedDiscoveryPluginProperties = config;
+ raProperties.setParsedDiscoveryPluginParameters(Util.parseDiscoveryPluginConfig(config));
+ }
+ }
+
/**
* Get the discovery group name
*
@@ -1417,6 +1438,8 @@
}
else if (discoveryAddress != null)
{
+ Map<String,Object> params = new HashMap<String,Object>();
+
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
@@ -1425,8 +1448,6 @@
discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
}
- DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
-
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
: raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null)
@@ -1442,10 +1463,13 @@
initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
}
- groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+ params.put(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, discoveryAddress);
+ params.put(DiscoveryGroupConstants.GROUP_PORT_NAME, discoveryPort);
+ params.put(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, initialTimeout);
+ params.put(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, initialTimeout);
+
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(SimpleUDPServerLocatorImpl.class.getName(), params, null);
- groupConfiguration.setRefreshTimeout(refreshTimeout);
-
if (ha)
{
cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
@@ -1455,6 +1479,25 @@
cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
+ else if (this.unparsedDiscoveryPluginProperties != null)
+ {
+ // for another discovery strategy
+ Map<String, Object> discoveryPluginParams =
+ overrideConnectionParameters(overrideProperties.getParsedDiscoveryPluginParameters(),raProperties.getParsedDiscoveryPluginParameters());
+
+ String serverLocatorClassName = (String)discoveryPluginParams.get("server-locator-class");
+
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(serverLocatorClassName, discoveryPluginParams, null);
+
+ if (ha)
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ else
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ }
else
{
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection Factory");
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/Util.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/Util.java 2011-09-22 19:24:22 UTC (rev 11397)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/ra/Util.java 2011-09-22 20:11:24 UTC (rev 11398)
@@ -199,6 +199,27 @@
return hashtable;
}
+ public static Map<String, Object> parseDiscoveryPluginConfig(final String config)
+ {
+ HashMap<String, Object> result = new HashMap<String, Object>();
+
+ String elements[] = config.split(";");
+
+ for (String element : elements)
+ {
+ String expression[] = element.split("=");
+
+ if (expression.length != 2)
+ {
+ throw new IllegalArgumentException("Invalid expression " + element + " at " + config);
+ }
+
+ result.put(expression[0].trim(), expression[1].trim());
+ }
+
+ return result;
+ }
+
public static List<Map<String, Object>> parseConfig(final String config)
{
List<Map<String, Object>> result =new ArrayList<Map<String, Object>>();
13 years, 3 months
JBoss hornetq SVN: r11397 - in branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq: api/core/client and 1 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-09-22 15:24:22 -0400 (Thu, 22 Sep 2011)
New Revision: 11397
Added:
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
Removed:
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Modified:
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
Log:
Devided the ServerLocatorImpl into 3 classes, AbstractServerLocator, StaticServerLocatorImpl and SimpleUDPServerLocatorImpl
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-09-22 15:30:57 UTC (rev 11396)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -14,6 +14,7 @@
package org.hornetq.api.core;
import java.io.Serializable;
+import java.util.Map;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.logging.Logger;
@@ -35,120 +36,36 @@
private static final Logger log = Logger.getLogger(DiscoveryGroupConfiguration.class);
- private String name;
+ private final String name;
- private String localBindAddress;
+ private final String serverLocatorClassName;
- private String groupAddress;
+ private final Map<String, Object> params;
- private int groupPort;
-
- private long refreshTimeout;
-
- private long discoveryInitialWaitTimeout;
-
- public DiscoveryGroupConfiguration(final String name,
- final String localBindAddress,
- final String groupAddress,
- final int groupPort,
- final long refreshTimeout,
- final long discoveryInitialWaitTimeout)
+ public DiscoveryGroupConfiguration(final String clazz, final Map<String, Object> params, final String name)
{
+ this.serverLocatorClassName = clazz;
+
+ this.params = params;
+
this.name = name;
- this.groupAddress = groupAddress;
- this.localBindAddress = localBindAddress;
- this.groupPort = groupPort;
- this.refreshTimeout = refreshTimeout;
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
}
- public DiscoveryGroupConfiguration(final String groupAddress,
- final int groupPort)
+ public String getServerLocatorClassName()
{
- this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAddress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
+ return this.serverLocatorClassName;
}
-
- public String getName()
- {
- return name;
- }
- public String getLocalBindAddress()
+ public Map<String, Object> getParams()
{
- return localBindAddress;
+ return this.params;
}
-
- public String getGroupAddress()
+
+ public String getName()
{
- return groupAddress;
+ return this.name;
}
-
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public long getRefreshTimeout()
- {
- return refreshTimeout;
- }
-
- /**
- * @param name the name to set
- */
- public void setName(final String name)
- {
- this.name = name;
- }
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAdress(final String localBindAddress)
- {
- this.localBindAddress = localBindAddress;
- }
-
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
- {
- this.groupAddress = groupAddress;
- }
-
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
- {
- this.groupPort = groupPort;
- }
-
- /**
- * @param refreshTimeout the refreshTimeout to set
- */
- public void setRefreshTimeout(final long refreshTimeout)
- {
- this.refreshTimeout = refreshTimeout;
- }
-
- /**
- * @return the discoveryInitialWaitTimeout
- */
- public long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- /**
- * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set
- */
- public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
- {
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
- }
-
@Override
public boolean equals(Object o)
{
@@ -157,11 +74,18 @@
DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o;
- if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false;
- if (groupPort != that.groupPort) return false;
- if (refreshTimeout != that.refreshTimeout) return false;
- if (groupAddress != null ? !groupAddress.equals(that.groupAddress) : that.groupAddress != null) return false;
- if (localBindAddress != null ? !localBindAddress.equals(that.localBindAddress) : that.localBindAddress != null)
+ if (this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME)
+ != that.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME)) return false;
+ if (this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME)
+ != that.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME)) return false;
+ if (this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME)
+ != that.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME)) return false;
+ if (this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) != null
+ ? !this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME).equals(that.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME))
+ : that.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) != null) return false;
+ if (this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) != null
+ ? !this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME).equals(that.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME))
+ : that.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) != null)
return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
@@ -171,9 +95,16 @@
@Override
public int hashCode()
{
+ int groupPort = this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME) != null
+ ? Integer.parseInt((String)this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME)) : 0;
+ int refreshTimeout = this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME) != null
+ ? Integer.parseInt((String)this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME)) : 0;
+ int discoveryInitialWaitTimeout = this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME) != null
+ ? Integer.parseInt((String)this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME)) : 0;
+
int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (localBindAddress != null ? localBindAddress.hashCode() : 0);
- result = 31 * result + (groupAddress != null ? groupAddress.hashCode() : 0);
+ result = 31 * result + (this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) != null ? this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME).hashCode() : 0);
+ result = 31 * result + (this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) != null ? this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME).hashCode() : 0);
result = 31 * result + groupPort;
result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32));
result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
@@ -186,17 +117,18 @@
@Override
public String toString()
{
- return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout +
+ return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" +
+ this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME) +
", groupAddress=" +
- groupAddress +
+ this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) +
", groupPort=" +
- groupPort +
+ this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME) +
", localBindAddress=" +
- localBindAddress +
+ this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) +
", name=" +
name +
", refreshTimeout=" +
- refreshTimeout +
+ this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME) +
"]";
}
Added: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+ // for static discovery
+ public static final String STATIC_CONNECTORS_CONNECTOR_NAMES_NAME = "static-connector-names";
+ public static final String STATIC_CONNECTORS_LIST_NAME = "static-connector-list";
+
+ // for simple UDP discovery
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-09-22 15:30:57 UTC (rev 11396)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -12,10 +12,18 @@
*/
package org.hornetq.api.core.client;
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.UUIDGenerator;
/**
* Utility class for creating HornetQ {@link ClientSessionFactory} objects.
@@ -28,6 +36,8 @@
*/
public class HornetQClient
{
+ private static final Logger log = Logger.getLogger(HornetQClient.class);
+
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 30000;
@@ -109,7 +119,10 @@
*/
public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration... transportConfigurations)
{
- return new ServerLocatorImpl(false, transportConfigurations);
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, Arrays.asList(transportConfigurations));
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, UUIDGenerator.getInstance().generateStringUUID());
+ return createServerLocatorWithoutHA(config);
}
/**
@@ -117,13 +130,27 @@
*
* The UDP address and port are used to listen for live servers in the cluster
*
- * @param discoveryAddress The UDP group address to listen for updates
- * @param discoveryPort the UDP port to listen for updates
+ * @param groupConfiguration The configuration for server discovery
* @return the ServerLocator
*/
public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(false, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.FALSE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ log.fatal("Could not instantiate ServerLocator implementation class: ", e);
+ return null;
+ }
+
+ return serverLocator;
}
/**
@@ -137,7 +164,10 @@
*/
public static ServerLocator createServerLocatorWithHA(TransportConfiguration... initialServers)
{
- return new ServerLocatorImpl(true, initialServers);
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, Arrays.asList(initialServers));
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, UUIDGenerator.getInstance().generateStringUUID());
+ return createServerLocatorWithHA(config);
}
/**
@@ -152,7 +182,22 @@
*/
public static ServerLocator createServerLocatorWithHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(true, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+
+ try{
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.TRUE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ log.fatal("Could not instantiate ServerLocator implementation class", e);
+ return null;
+ }
+
+ return serverLocator;
}
Copied: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (from rev 11395, branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java)
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,1442 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A AbstractServerLocator, was derived from ServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public abstract class AbstractServerLocator implements ServerLocatorInternal, DiscoveryListener, Serializable
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(AbstractServerLocator.class);
+
+ private final boolean ha;
+
+ private boolean finalizeCheck = true;
+
+ private boolean clusterConnection;
+
+ private transient String identity;
+
+ private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+
+ private TransportConfiguration[] initialConnectors;
+
+ private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+ private final Topology topology;
+
+ private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
+ private boolean receivedTopology;
+
+ private boolean compressLargeMessage;
+
+ // if the system should shutdown the pool when shutting down
+ private transient boolean shutdownPool;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ private boolean readOnly;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private int initialConnectAttempts;
+
+ private boolean failoverOnInitialConnection;
+
+ private int initialMessagePacketSize;
+
+ private volatile boolean closed;
+
+ private volatile boolean closing;
+
+ private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+ private static ExecutorService globalThreadPool;
+
+ private Executor startExecutor;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private AfterConnectInternalListener afterConnectListener;
+
+ private String groupID;
+
+ private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
+
+ private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ public static synchronized void clearThreadPools()
+ {
+
+ if (globalThreadPool != null)
+ {
+ globalThreadPool.shutdown();
+ try
+ {
+ if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Couldn't finish the globalThreadPool");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ finally
+ {
+ globalThreadPool = null;
+ }
+ }
+
+ if (globalScheduledThreadPool != null)
+ {
+ globalScheduledThreadPool.shutdown();
+ try
+ {
+ if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ finally
+ {
+ globalScheduledThreadPool = null;
+ }
+ }
+ }
+
+ private static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (globalThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+ globalThreadPool = Executors.newCachedThreadPool(factory);
+ }
+
+ return globalThreadPool;
+ }
+
+ public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ {
+ if (globalScheduledThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+ true,
+ getThisClassLoader());
+
+ globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+ factory);
+ }
+
+ return globalScheduledThreadPool;
+ }
+
+ private void setThreadPools()
+ {
+ if (threadPool != null)
+ {
+ return;
+ }
+ else if (useGlobalPools)
+ {
+ threadPool = getGlobalThreadPool();
+
+ scheduledThreadPool = getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ this.shutdownPool = true;
+
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ if (threadPoolMaxSize == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+ }
+
+ factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ }
+ }
+
+ protected ExecutorService getThreadPool()
+ {
+ return this.threadPool;
+ }
+
+ protected ScheduledExecutorService getScheduledThreadPool()
+ {
+ return this.scheduledThreadPool;
+ }
+
+ private static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
+ }
+
+ private void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName == null)
+ {
+ throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ return null;
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+ "\"",
+ e);
+ }
+ }
+ });
+ }
+
+ private AbstractServerLocator(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration,
+ final TransportConfiguration[] transportConfigs)
+ {
+ e.fillInStackTrace();
+
+ this.topology = topology;
+
+ this.ha = useHA;
+
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
+
+ this.initialConnectors = transportConfigs;
+
+ this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
+ clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+ minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+ confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+ producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+ producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+ blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+ blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+ blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+ autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+ preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+ ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+ connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+ useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+ scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+ retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+ retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+ reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+ initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
+ failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+
+ clusterConnection = false;
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public AbstractServerLocator(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ topology.setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public AbstractServerLocator(final boolean useHA, final TransportConfiguration... transportConfigs)
+ {
+ this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ topology.setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public AbstractServerLocator(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public AbstractServerLocator(final Topology topology,
+ final boolean useHA,
+ final TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
+ private TransportConfiguration selectConnector()
+ {
+ if (receivedTopology)
+ {
+ int pos = loadBalancingPolicy.select(topologyArray.length);
+
+ Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
+
+ return pair.a;
+ }
+ else
+ {
+ // Get from initialconnectors
+
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+ return initialConnectors[pos];
+ }
+ }
+
+ protected abstract void initialiseInternal() throws Exception;
+
+ private void initialise() throws Exception
+ {
+ if (!readOnly)
+ {
+ setThreadPools();
+ instantiateLoadBalancingPolicy();
+
+ initialiseInternal();
+
+ readOnly = true;
+ }
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ this.startExecutor = executor;
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public Executor getExecutor()
+ {
+ return startExecutor;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
+ */
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
+
+ public boolean doFinalizeCheck()
+ {
+ return finalizeCheck;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+ */
+ public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+ {
+ this.afterConnectListener = listener;
+ }
+
+ public AfterConnectInternalListener getAfterConnectInternalListener()
+ {
+ return afterConnectListener;
+ }
+
+ public boolean isClosed()
+ {
+ return closed || closing;
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+ transportConfiguration,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ protected abstract void waitInitialDiscovery() throws Exception;
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (closed || closing)
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ waitInitialDiscovery();
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ tc,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ }
+ catch (HornetQException e)
+ {
+ factory.close();
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (topologyArray != null && attempts == topologyArray.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (ha || clusterConnection)
+ {
+ long timeout = System.currentTimeMillis() + 30000;
+ while (!AbstractServerLocator.this.closed && !AbstractServerLocator.this.closing &&
+ !receivedTopology &&
+ timeout > System.currentTimeMillis())
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(1000);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ }
+
+ if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology");
+ }
+
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ }
+
+ public boolean isHA()
+ {
+ return ha;
+ }
+
+ public boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
+ public long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+ {
+ checkWrite();
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ }
+
+ public long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ public void setConnectionTTL(final long connectionTTL)
+ {
+ checkWrite();
+ this.connectionTTL = connectionTTL;
+ }
+
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ public void setCallTimeout(final long callTimeout)
+ {
+ checkWrite();
+ this.callTimeout = callTimeout;
+ }
+
+ public int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public void setMinLargeMessageSize(final int minLargeMessageSize)
+ {
+ checkWrite();
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public int getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+
+ public void setConsumerWindowSize(final int consumerWindowSize)
+ {
+ checkWrite();
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public int getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public void setConsumerMaxRate(final int consumerMaxRate)
+ {
+ checkWrite();
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
+ public void setConfirmationWindowSize(final int confirmationWindowSize)
+ {
+ checkWrite();
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
+
+ public int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public void setProducerWindowSize(final int producerWindowSize)
+ {
+ checkWrite();
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public int getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public void setProducerMaxRate(final int producerMaxRate)
+ {
+ checkWrite();
+ this.producerMaxRate = producerMaxRate;
+ }
+
+ public boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ {
+ checkWrite();
+ this.blockOnAcknowledge = blockOnAcknowledge;
+ }
+
+ public boolean isBlockOnDurableSend()
+ {
+ return blockOnDurableSend;
+ }
+
+ public void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ {
+ checkWrite();
+ this.blockOnDurableSend = blockOnDurableSend;
+ }
+
+ public boolean isBlockOnNonDurableSend()
+ {
+ return blockOnNonDurableSend;
+ }
+
+ public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+ {
+ checkWrite();
+ this.blockOnNonDurableSend = blockOnNonDurableSend;
+ }
+
+ public boolean isAutoGroup()
+ {
+ return autoGroup;
+ }
+
+ public void setAutoGroup(final boolean autoGroup)
+ {
+ checkWrite();
+ this.autoGroup = autoGroup;
+ }
+
+ public boolean isPreAcknowledge()
+ {
+ return preAcknowledge;
+ }
+
+ public void setPreAcknowledge(final boolean preAcknowledge)
+ {
+ checkWrite();
+ this.preAcknowledge = preAcknowledge;
+ }
+
+ public int getAckBatchSize()
+ {
+ return ackBatchSize;
+ }
+
+ public void setAckBatchSize(final int ackBatchSize)
+ {
+ checkWrite();
+ this.ackBatchSize = ackBatchSize;
+ }
+
+ public boolean isUseGlobalPools()
+ {
+ return useGlobalPools;
+ }
+
+ public void setUseGlobalPools(final boolean useGlobalPools)
+ {
+ checkWrite();
+ this.useGlobalPools = useGlobalPools;
+ }
+
+ public int getScheduledThreadPoolMaxSize()
+ {
+ return scheduledThreadPoolMaxSize;
+ }
+
+ public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+ {
+ checkWrite();
+ this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ }
+
+ public int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+
+ public void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ {
+ checkWrite();
+ this.threadPoolMaxSize = threadPoolMaxSize;
+ }
+
+ public long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ this.retryInterval = retryInterval;
+ }
+
+ public long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ public void setMaxRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ maxRetryInterval = retryInterval;
+ }
+
+ public double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+ {
+ checkWrite();
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ public int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public void setReconnectAttempts(final int reconnectAttempts)
+ {
+ checkWrite();
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public void setInitialConnectAttempts(int initialConnectAttempts)
+ {
+ checkWrite();
+ this.initialConnectAttempts = initialConnectAttempts;
+ }
+
+ public int getInitialConnectAttempts()
+ {
+ return initialConnectAttempts;
+ }
+
+ public boolean isFailoverOnInitialConnection()
+ {
+ return this.failoverOnInitialConnection;
+ }
+
+ public void setFailoverOnInitialConnection(final boolean failover)
+ {
+ checkWrite();
+ this.failoverOnInitialConnection = failover;
+ }
+
+ public String getConnectionLoadBalancingPolicyClassName()
+ {
+ return connectionLoadBalancingPolicyClassName;
+ }
+
+ public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+ {
+ checkWrite();
+ connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ }
+
+ public TransportConfiguration[] getStaticTransportConfigurations()
+ {
+ return this.initialConnectors;
+ }
+
+ public void setInitialConnectors(TransportConfiguration[] connectors)
+ {
+ this.initialConnectors = connectors;
+ }
+
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+ {
+ return discoveryGroupConfiguration;
+ }
+
+ public List<Interceptor> getInterceptors()
+ {
+ return this.interceptors;
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ public int getInitialMessagePacketSize()
+ {
+ return initialMessagePacketSize;
+ }
+
+ public void setInitialMessagePacketSize(final int size)
+ {
+ checkWrite();
+ initialMessagePacketSize = size;
+ }
+
+ public void setGroupID(final String groupID)
+ {
+ checkWrite();
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
+ */
+ public boolean isCompressLargeMessage()
+ {
+ return compressLargeMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
+ */
+ public void setCompressLargeMessage(boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
+ private void checkWrite()
+ {
+ if (readOnly)
+ {
+ throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+ }
+ }
+
+ public String getIdentity()
+ {
+ return identity;
+ }
+
+ public void setIdentity(String identity)
+ {
+ this.identity = identity;
+ }
+
+ public void setNodeID(String nodeID)
+ {
+ this.nodeID = nodeID;
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public void setClusterConnection(boolean clusterConnection)
+ {
+ this.clusterConnection = clusterConnection;
+ }
+
+ public boolean isClusterConnection()
+ {
+ return clusterConnection;
+ }
+
+ public TransportConfiguration getClusterTransportConfiguration()
+ {
+ return clusterTransportConfiguration;
+ }
+
+ public void setClusterTransportConfiguration(TransportConfiguration tc)
+ {
+ this.clusterTransportConfiguration = tc;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public void setBackup(boolean backup)
+ {
+ this.backup = backup;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ if (finalizeCheck)
+ {
+ close();
+ }
+
+ super.finalize();
+ }
+
+ public void cleanup()
+ {
+ doClose(false);
+ }
+
+ public void close()
+ {
+ doClose(true);
+ }
+
+ protected abstract void doCloseInternal();
+
+ protected void doClose(final boolean sendClose)
+ {
+ if (closed)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is already closed when calling closed");
+ }
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is calling close", new Exception("trace"));
+ }
+
+ closing = true;
+
+ doCloseInternal();
+
+ Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
+
+ for (ClientSessionFactory factory : clonedFactory)
+ {
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
+ }
+
+ factories.clear();
+
+ if (shutdownPool)
+ {
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
+
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (scheduledThreadPool != null)
+ {
+ scheduledThreadPool.shutdown();
+
+ try
+ {
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for scheduled pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ readOnly = false;
+
+ closed = true;
+ }
+
+ /** This is directly called when the connection to the node is gone,
+ * or when the node sends a disconnection.
+ * Look for callers of this method! */
+ public void notifyNodeDown(final long eventTime, final String nodeID)
+ {
+
+ if (topology == null)
+ {
+ // there's no topology here
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
+ }
+
+ if (topology.removeMember(eventTime, nodeID))
+ {
+ if (topology.isEmpty())
+ {
+ // Resetting the topology to its original condition as it was brand new
+ synchronized (this)
+ {
+ topologyArray = null;
+ receivedTopology = false;
+ }
+ }
+ else
+ {
+ updateArraysAndPairs();
+
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ {
+ // Resetting the topology to its original condition as it was brand new
+ receivedTopology = false;
+ }
+ }
+ }
+
+ }
+
+ public void notifyNodeUp(long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
+ {
+ if (topology == null)
+ {
+ // there's no topology
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
+ }
+
+ TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
+
+ if (topology.updateMember(uniqueEventID, nodeID, member))
+ {
+
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
+ {
+ for (ClientSessionFactory factory : factories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+ actMember.getConnector().b);
+ }
+ }
+
+ updateArraysAndPairs();
+ }
+
+ if (last)
+ {
+ synchronized (this)
+ {
+ receivedTopology = true;
+ // Notify if waiting on getting topology
+ notifyAll();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ if (identity != null)
+ {
+ return "ServerLocatorImpl (identity=" + identity +
+ ") [initialConnectors=" +
+ Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
+ else
+ {
+ return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
+ }
+
+ private synchronized void updateArraysAndPairs()
+ {
+ Collection<TopologyMember> membersCopy = topology.getMembers();
+
+ topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+ membersCopy.size());
+
+ int count = 0;
+ for (TopologyMember pair : membersCopy)
+ {
+ topologyArray[count++] = pair.getConnector();
+ }
+ }
+
+ public synchronized void connectorsChanged()
+ {
+ List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+ this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ newConnectors.size());
+
+ int count = 0;
+ for (DiscoveryEntry entry : newConnectors)
+ {
+ this.initialConnectors[count++] = entry.getConnector();
+
+ if (topology != null && topology.getMember(entry.getNodeID()) == null)
+ {
+ TopologyMember member = new TopologyMember(entry.getConnector(), null);
+ // on this case we set it as zero as any update coming from server should be accepted
+ topology.updateMember(0, entry.getNodeID(), member);
+ }
+ }
+
+ if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ public synchronized void factoryClosed(final ClientSessionFactory factory)
+ {
+ factories.remove(factory);
+
+ if (!clusterConnection && factories.isEmpty())
+ {
+ // Go back to using the broadcast or static list
+
+ receivedTopology = false;
+
+ topologyArray = null;
+ }
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topology.addClusterTopologyListener(listener);
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topology.removeClusterTopologyListener(listener);
+ }
+
+ public synchronized void addFactory(ClientSessionFactoryInternal factory)
+ {
+ if (factory != null)
+ {
+ TransportConfiguration backup = null;
+
+ if (topology != null)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ }
+
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
+ }
+ }
+
+}
Deleted: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-22 15:30:57 UTC (rev 11396)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -1,1714 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * A ServerLocatorImpl
- *
- * @author Tim Fox
- */
-public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
-{
- private static final long serialVersionUID = -1615857864410205260L;
-
- private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
-
- private final boolean ha;
-
- private boolean finalizeCheck = true;
-
- private boolean clusterConnection;
-
- private transient String identity;
-
- private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
-
- private TransportConfiguration[] initialConnectors;
-
- private DiscoveryGroupConfiguration discoveryGroupConfiguration;
-
- private StaticConnector staticConnector = new StaticConnector();
-
- private final Topology topology;
-
- private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
-
- private boolean receivedTopology;
-
- private boolean compressLargeMessage;
-
- // if the system should shutdown the pool when shutting down
- private transient boolean shutdownPool;
-
- private ExecutorService threadPool;
-
- private ScheduledExecutorService scheduledThreadPool;
-
- private DiscoveryGroup discoveryGroup;
-
- private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
- private boolean readOnly;
-
- // Settable attributes:
-
- private boolean cacheLargeMessagesClient;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private int initialConnectAttempts;
-
- private boolean failoverOnInitialConnection;
-
- private int initialMessagePacketSize;
-
- private volatile boolean closed;
-
- private volatile boolean closing;
-
- private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
- private static ExecutorService globalThreadPool;
-
- private Executor startExecutor;
-
- private static ScheduledExecutorService globalScheduledThreadPool;
-
- private AfterConnectInternalListener afterConnectListener;
-
- private String groupID;
-
- private String nodeID;
-
- private TransportConfiguration clusterTransportConfiguration;
-
- private boolean backup;
-
- private final Exception e = new Exception();
-
- // To be called when there are ServerLocator being finalized.
- // To be used on test assertions
- public static Runnable finalizeCallback = null;
-
- public static synchronized void clearThreadPools()
- {
-
- if (globalThreadPool != null)
- {
- globalThreadPool.shutdown();
- try
- {
- if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Couldn't finish the globalThreadPool");
- }
- }
- catch (InterruptedException e)
- {
- }
- finally
- {
- globalThreadPool = null;
- }
- }
-
- if (globalScheduledThreadPool != null)
- {
- globalScheduledThreadPool.shutdown();
- try
- {
- if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
- }
- }
- catch (InterruptedException e)
- {
- }
- finally
- {
- globalScheduledThreadPool = null;
- }
- }
- }
-
- private static synchronized ExecutorService getGlobalThreadPool()
- {
- if (globalThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
-
- globalThreadPool = Executors.newCachedThreadPool(factory);
- }
-
- return globalThreadPool;
- }
-
- public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
- {
- if (globalScheduledThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
-
- globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
- factory);
- }
-
- return globalScheduledThreadPool;
- }
-
- private void setThreadPools()
- {
- if (threadPool != null)
- {
- return;
- }
- else if (useGlobalPools)
- {
- threadPool = getGlobalThreadPool();
-
- scheduledThreadPool = getGlobalScheduledThreadPool();
- }
- else
- {
- this.shutdownPool = true;
-
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- if (threadPoolMaxSize == -1)
- {
- threadPool = Executors.newCachedThreadPool(factory);
- }
- else
- {
- threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
- }
-
- factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
- }
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
- }
-
- private void instantiateLoadBalancingPolicy()
- {
- if (connectionLoadBalancingPolicyClassName == null)
- {
- throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
- }
-
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
- return null;
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
- }
- }
- });
- }
-
- private synchronized void initialise() throws Exception
- {
- if (!readOnly)
- {
- setThreadPools();
-
- instantiateLoadBalancingPolicy();
-
- if (discoveryGroupConfiguration != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
-
- InetAddress lbAddress;
-
- if (discoveryGroupConfiguration.getLocalBindAddress() != null)
- {
- lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
- }
- else
- {
- lbAddress = null;
- }
-
- discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
-
- readOnly = true;
- }
- }
-
- private ServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final DiscoveryGroupConfiguration discoveryGroupConfiguration,
- final TransportConfiguration[] transportConfigs)
- {
- e.fillInStackTrace();
-
- this.topology = topology;
-
- this.ha = useHA;
-
- this.discoveryGroupConfiguration = discoveryGroupConfiguration;
-
- this.initialConnectors = transportConfigs;
-
- this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
- clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
- minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
-
- failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
- clusterConnection = false;
- }
-
- /**
- * Create a ServerLocatorImpl using UDP discovery to lookup cluster
- *
- * @param discoveryAddress
- * @param discoveryPort
- */
- public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
- {
- this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
- if (useHA)
- {
- // We only set the owner at where the Topology was created.
- // For that reason we can't set it at the main constructor
- topology.setOwner(this);
- }
- }
-
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
- {
- this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
- if (useHA)
- {
- // We only set the owner at where the Topology was created.
- // For that reason we can't set it at the main constructor
- topology.setOwner(this);
- }
- }
-
- /**
- * Create a ServerLocatorImpl using UDP discovery to lookup cluster
- *
- * @param discoveryAddress
- * @param discoveryPort
- */
- public ServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final DiscoveryGroupConfiguration groupConfiguration)
- {
- this(topology, useHA, groupConfiguration, null);
-
- }
-
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public ServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final TransportConfiguration... transportConfigs)
- {
- this(topology, useHA, null, transportConfigs);
- }
-
- private TransportConfiguration selectConnector()
- {
- if (receivedTopology)
- {
- int pos = loadBalancingPolicy.select(topologyArray.length);
-
- Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
-
- return pair.a;
- }
- else
- {
- // Get from initialconnectors
-
- int pos = loadBalancingPolicy.select(initialConnectors.length);
-
- return initialConnectors[pos];
- }
- }
-
- public void start(Executor executor) throws Exception
- {
- initialise();
-
- this.startExecutor = executor;
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- connect();
- }
- catch (Exception e)
- {
- if (!closing)
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
- }
-
- public Executor getExecutor()
- {
- return startExecutor;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
- */
- public void disableFinalizeCheck()
- {
- finalizeCheck = false;
- }
-
- public ClientSessionFactoryInternal connect() throws Exception
- {
- ClientSessionFactoryInternal sf;
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
- {
- sf = (ClientSessionFactoryInternal)staticConnector.connect();
- }
- // wait for discovery group to get the list of initial connectors
- else
- {
- sf = (ClientSessionFactoryInternal)createSessionFactory();
- }
- addFactory(sf);
- return sf;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
- */
- public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
- {
- this.afterConnectListener = listener;
- }
-
- public AfterConnectInternalListener getAfterConnectInternalListener()
- {
- return afterConnectListener;
- }
-
- public boolean isClosed()
- {
- return closed || closing;
- }
-
- public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
- transportConfiguration,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
-
- addFactory(factory);
-
- return factory;
- }
-
- public ClientSessionFactory createSessionFactory() throws Exception
- {
- if (closed || closing)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- if (initialConnectors == null && discoveryGroup != null)
- {
- // Wait for an initial broadcast to give us at least one node in the cluster
- long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
- boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
- if (!ok)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from cluster");
- }
- }
-
- ClientSessionFactoryInternal factory = null;
-
- synchronized (this)
- {
- boolean retry;
- int attempts = 0;
- do
- {
- retry = false;
-
- TransportConfiguration tc = selectConnector();
-
- // try each factory in the list until we find one which works
-
- try
- {
- factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
- }
- catch (HornetQException e)
- {
- factory.close();
- factory = null;
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- attempts++;
-
- if (topologyArray != null && attempts == topologyArray.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- retry = true;
- }
- else
- {
- throw e;
- }
- }
- }
- while (retry);
-
- if (ha || clusterConnection)
- {
- long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
- !receivedTopology &&
- timeout > System.currentTimeMillis())
- {
- // Now wait for the topology
-
- try
- {
- wait(1000);
- }
- catch (InterruptedException ignore)
- {
- }
-
- }
-
- if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology");
- }
-
- }
-
- addFactory(factory);
-
- return factory;
- }
-
- }
-
- public boolean isHA()
- {
- return ha;
- }
-
- public boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
- public long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
- public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
- {
- checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- }
-
- public long getConnectionTTL()
- {
- return connectionTTL;
- }
-
- public void setConnectionTTL(final long connectionTTL)
- {
- checkWrite();
- this.connectionTTL = connectionTTL;
- }
-
- public long getCallTimeout()
- {
- return callTimeout;
- }
-
- public void setCallTimeout(final long callTimeout)
- {
- checkWrite();
- this.callTimeout = callTimeout;
- }
-
- public int getMinLargeMessageSize()
- {
- return minLargeMessageSize;
- }
-
- public void setMinLargeMessageSize(final int minLargeMessageSize)
- {
- checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
- }
-
- public int getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public void setConsumerWindowSize(final int consumerWindowSize)
- {
- checkWrite();
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public int getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public void setConsumerMaxRate(final int consumerMaxRate)
- {
- checkWrite();
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public int getConfirmationWindowSize()
- {
- return confirmationWindowSize;
- }
-
- public void setConfirmationWindowSize(final int confirmationWindowSize)
- {
- checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
- }
-
- public int getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public void setProducerWindowSize(final int producerWindowSize)
- {
- checkWrite();
- this.producerWindowSize = producerWindowSize;
- }
-
- public int getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public void setProducerMaxRate(final int producerMaxRate)
- {
- checkWrite();
- this.producerMaxRate = producerMaxRate;
- }
-
- public boolean isBlockOnAcknowledge()
- {
- return blockOnAcknowledge;
- }
-
- public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
- {
- checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
- }
-
- public boolean isBlockOnDurableSend()
- {
- return blockOnDurableSend;
- }
-
- public void setBlockOnDurableSend(final boolean blockOnDurableSend)
- {
- checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
- }
-
- public boolean isBlockOnNonDurableSend()
- {
- return blockOnNonDurableSend;
- }
-
- public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
- {
- checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
- }
-
- public boolean isAutoGroup()
- {
- return autoGroup;
- }
-
- public void setAutoGroup(final boolean autoGroup)
- {
- checkWrite();
- this.autoGroup = autoGroup;
- }
-
- public boolean isPreAcknowledge()
- {
- return preAcknowledge;
- }
-
- public void setPreAcknowledge(final boolean preAcknowledge)
- {
- checkWrite();
- this.preAcknowledge = preAcknowledge;
- }
-
- public int getAckBatchSize()
- {
- return ackBatchSize;
- }
-
- public void setAckBatchSize(final int ackBatchSize)
- {
- checkWrite();
- this.ackBatchSize = ackBatchSize;
- }
-
- public boolean isUseGlobalPools()
- {
- return useGlobalPools;
- }
-
- public void setUseGlobalPools(final boolean useGlobalPools)
- {
- checkWrite();
- this.useGlobalPools = useGlobalPools;
- }
-
- public int getScheduledThreadPoolMaxSize()
- {
- return scheduledThreadPoolMaxSize;
- }
-
- public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
- {
- checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
- }
-
- public int getThreadPoolMaxSize()
- {
- return threadPoolMaxSize;
- }
-
- public void setThreadPoolMaxSize(final int threadPoolMaxSize)
- {
- checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
- }
-
- public long getRetryInterval()
- {
- return retryInterval;
- }
-
- public void setRetryInterval(final long retryInterval)
- {
- checkWrite();
- this.retryInterval = retryInterval;
- }
-
- public long getMaxRetryInterval()
- {
- return maxRetryInterval;
- }
-
- public void setMaxRetryInterval(final long retryInterval)
- {
- checkWrite();
- maxRetryInterval = retryInterval;
- }
-
- public double getRetryIntervalMultiplier()
- {
- return retryIntervalMultiplier;
- }
-
- public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
- {
- checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- }
-
- public int getReconnectAttempts()
- {
- return reconnectAttempts;
- }
-
- public void setReconnectAttempts(final int reconnectAttempts)
- {
- checkWrite();
- this.reconnectAttempts = reconnectAttempts;
- }
-
- public void setInitialConnectAttempts(int initialConnectAttempts)
- {
- checkWrite();
- this.initialConnectAttempts = initialConnectAttempts;
- }
-
- public int getInitialConnectAttempts()
- {
- return initialConnectAttempts;
- }
-
- public boolean isFailoverOnInitialConnection()
- {
- return this.failoverOnInitialConnection;
- }
-
- public void setFailoverOnInitialConnection(final boolean failover)
- {
- checkWrite();
- this.failoverOnInitialConnection = failover;
- }
-
- public String getConnectionLoadBalancingPolicyClassName()
- {
- return connectionLoadBalancingPolicyClassName;
- }
-
- public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
- {
- checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
- }
-
- public TransportConfiguration[] getStaticTransportConfigurations()
- {
- return this.initialConnectors;
- }
-
- public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
- {
- return discoveryGroupConfiguration;
- }
-
- public void addInterceptor(final Interceptor interceptor)
- {
- interceptors.add(interceptor);
- }
-
- public boolean removeInterceptor(final Interceptor interceptor)
- {
- return interceptors.remove(interceptor);
- }
-
- public int getInitialMessagePacketSize()
- {
- return initialMessagePacketSize;
- }
-
- public void setInitialMessagePacketSize(final int size)
- {
- checkWrite();
- initialMessagePacketSize = size;
- }
-
- public void setGroupID(final String groupID)
- {
- checkWrite();
- this.groupID = groupID;
- }
-
- public String getGroupID()
- {
- return groupID;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
- */
- public boolean isCompressLargeMessage()
- {
- return compressLargeMessage;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
- */
- public void setCompressLargeMessage(boolean compress)
- {
- this.compressLargeMessage = compress;
- }
-
- private void checkWrite()
- {
- if (readOnly)
- {
- throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
- }
- }
-
- public String getIdentity()
- {
- return identity;
- }
-
- public void setIdentity(String identity)
- {
- this.identity = identity;
- }
-
- public void setNodeID(String nodeID)
- {
- this.nodeID = nodeID;
- }
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public void setClusterConnection(boolean clusterConnection)
- {
- this.clusterConnection = clusterConnection;
- }
-
- public boolean isClusterConnection()
- {
- return clusterConnection;
- }
-
- public TransportConfiguration getClusterTransportConfiguration()
- {
- return clusterTransportConfiguration;
- }
-
- public void setClusterTransportConfiguration(TransportConfiguration tc)
- {
- this.clusterTransportConfiguration = tc;
- }
-
- public boolean isBackup()
- {
- return backup;
- }
-
- public void setBackup(boolean backup)
- {
- this.backup = backup;
- }
-
- @Override
- protected void finalize() throws Throwable
- {
- if (finalizeCheck)
- {
- close();
- }
-
- super.finalize();
- }
-
- public void cleanup()
- {
- doClose(false);
- }
-
- public void close()
- {
- doClose(true);
- }
-
- protected void doClose(final boolean sendClose)
- {
- if (closed)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + " is already closed when calling closed");
- }
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " is calling close", new Exception("trace"));
- }
-
- closing = true;
-
- if (discoveryGroup != null)
- {
- try
- {
- discoveryGroup.stop();
- }
- catch (Exception e)
- {
- log.error("Failed to stop discovery group", e);
- }
- }
- else
- {
- staticConnector.disconnect();
- }
-
- Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
-
- for (ClientSessionFactory factory : clonedFactory)
- {
- if (sendClose)
- {
- factory.close();
- }
- else
- {
- factory.cleanup();
- }
- }
-
- factories.clear();
-
- if (shutdownPool)
- {
- if (threadPool != null)
- {
- threadPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (scheduledThreadPool != null)
- {
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for scheduled pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
- readOnly = false;
-
- closed = true;
- }
-
- /** This is directly called when the connection to the node is gone,
- * or when the node sends a disconnection.
- * Look for callers of this method! */
- public void notifyNodeDown(final long eventTime, final String nodeID)
- {
-
- if (topology == null)
- {
- // there's no topology here
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
- }
-
- if (topology.removeMember(eventTime, nodeID))
- {
- if (topology.isEmpty())
- {
- // Resetting the topology to its original condition as it was brand new
- synchronized (this)
- {
- topologyArray = null;
- receivedTopology = false;
- }
- }
- else
- {
- updateArraysAndPairs();
-
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
- {
- // Resetting the topology to its original condition as it was brand new
- receivedTopology = false;
- }
- }
- }
-
- }
-
- public void notifyNodeUp(long uniqueEventID,
- final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
- {
- if (topology == null)
- {
- // there's no topology
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
- }
-
- TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
-
- if (topology.updateMember(uniqueEventID, nodeID, member))
- {
-
- TopologyMember actMember = topology.getMember(nodeID);
-
- if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
- {
- for (ClientSessionFactory factory : factories)
- {
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
- actMember.getConnector().b);
- }
- }
-
- updateArraysAndPairs();
- }
-
- if (last)
- {
- synchronized (this)
- {
- receivedTopology = true;
- // Notify if waiting on getting topology
- notifyAll();
- }
- }
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString()
- {
- if (identity != null)
- {
- return "ServerLocatorImpl (identity=" + identity +
- ") [initialConnectors=" +
- Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
- }
- else
- {
- return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
- }
- }
-
- private synchronized void updateArraysAndPairs()
- {
- Collection<TopologyMember> membersCopy = topology.getMembers();
-
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
- membersCopy.size());
-
- int count = 0;
- for (TopologyMember pair : membersCopy)
- {
- topologyArray[count++] = pair.getConnector();
- }
- }
-
- public synchronized void connectorsChanged()
- {
- List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
-
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
- newConnectors.size());
-
- int count = 0;
- for (DiscoveryEntry entry : newConnectors)
- {
- this.initialConnectors[count++] = entry.getConnector();
-
- if (topology != null && topology.getMember(entry.getNodeID()) == null)
- {
- TopologyMember member = new TopologyMember(entry.getConnector(), null);
- // on this case we set it as zero as any update coming from server should be accepted
- topology.updateMember(0, entry.getNodeID(), member);
- }
- }
-
- if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
- {
- // FIXME the node is alone in the cluster. We create a connection to the new node
- // to trigger the node notification to form the cluster.
- try
- {
- connect();
- }
- catch (Exception e)
- {
- e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- public synchronized void factoryClosed(final ClientSessionFactory factory)
- {
- factories.remove(factory);
-
- if (!clusterConnection && factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topologyArray = null;
- }
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
- public void addClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topology.addClusterTopologyListener(listener);
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topology.removeClusterTopologyListener(listener);
- }
-
- public synchronized void addFactory(ClientSessionFactoryInternal factory)
- {
- if (factory != null)
- {
- TransportConfiguration backup = null;
-
- if (topology != null)
- {
- backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- }
-
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
- }
- }
-
- class StaticConnector implements Serializable
- {
- private static final long serialVersionUID = 6772279632415242634l;
-
- private List<Connector> connectors;
-
- public ClientSessionFactory connect() throws HornetQException
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- ClientSessionFactory csf = null;
-
- createConnectors();
-
- try
- {
-
- int retryNumber = 0;
- while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
- {
- retryNumber++;
- for (Connector conn : connectors)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::Submitting connect towards " + conn);
- }
-
- csf = conn.tryConnect();
-
- if (csf != null)
- {
- csf.getConnection().addFailureListener(new FailureListener()
- {
- // Case the node where the cluster connection was connected is gone, we need to restart the
- // connection
- public void connectionFailed(HornetQException exception, boolean failedOver)
- {
- if (clusterConnection && exception.getCode() == HornetQException.DISCONNECTED)
- {
- try
- {
- ServerLocatorImpl.this.start(startExecutor);
- }
- catch (Exception e)
- {
- // There isn't much to be done if this happens here
- log.warn(e.getMessage());
- }
- }
- }
- });
-
- if (log.isDebugEnabled())
- {
- log.debug("Returning " + csf +
- " after " +
- retryNumber +
- " retries on StaticConnector " +
- ServerLocatorImpl.this);
- }
-
- return csf;
- }
- }
-
- if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts)
- {
- break;
- }
-
- if (!closed && !closing)
- {
- Thread.sleep(retryInterval);
- }
- }
-
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
- }
-
- if (csf == null && !closed)
- {
- log.warn("Failed to connecto to any static connector, throwing exception now");
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
- if (log.isDebugEnabled())
- {
- log.debug("Returning " + csf + " on " + ServerLocatorImpl.this);
- }
- return csf;
- }
-
- private synchronized void createConnectors()
- {
- if (connectors != null)
- {
- for (Connector conn : connectors)
- {
- if (conn != null)
- {
- conn.disconnect();
- }
- }
- }
- connectors = new ArrayList<Connector>();
- for (TransportConfiguration initialConnector : initialConnectors)
- {
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- factory.disableFinalizeCheck();
-
- connectors.add(new Connector(initialConnector, factory));
- }
- }
-
- public synchronized void disconnect()
- {
- if (connectors != null)
- {
- for (Connector connector : connectors)
- {
- connector.disconnect();
- }
- }
- }
-
- public void finalize() throws Throwable
- {
- if (!closed && finalizeCheck)
- {
- log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
- System.identityHashCode(this));
-
- log.warn("The ServerLocator you didn't close was created here:", e);
-
- if (ServerLocatorImpl.finalizeCallback != null)
- {
- ServerLocatorImpl.finalizeCallback.run();
- }
-
- close();
- }
-
- super.finalize();
- }
-
- class Connector
- {
- private TransportConfiguration initialConnector;
-
- private volatile ClientSessionFactoryInternal factory;
-
- private boolean interrupted = false;
-
- private Exception e;
-
- public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
- {
- this.initialConnector = initialConnector;
- this.factory = factory;
- }
-
- public ClientSessionFactory tryConnect() throws HornetQException
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::Trying to connect to " + factory);
- }
- try
- {
- ClientSessionFactoryInternal factoryToUse = factory;
- if (factoryToUse != null)
- {
- factory.connect(1, false);
- }
- return factoryToUse;
- }
- catch (HornetQException e)
- {
- log.debug(this + "::Exception on establish connector initial connection", e);
- return null;
- }
- }
-
- public void disconnect()
- {
- interrupted = true;
-
- if (factory != null)
- {
- factory.causeExit();
- factory.cleanup();
- factory = null;
- }
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString()
- {
- return "Connector [initialConnector=" + initialConnector + "]";
- }
-
- }
- }
-}
Added: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl, was derived from ServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public class SimpleUDPServerLocatorImpl extends AbstractServerLocator
+{
+ private static final Logger log = Logger.getLogger(SimpleUDPServerLocatorImpl.class);
+
+ private String discoveryGroupName;
+
+ private InetAddress localBindAddress;
+
+ private InetAddress groupAddress;
+
+ private int groupPort;
+
+ private long refreshTimeout;
+
+ private long initialWaitTimeout;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private volatile boolean closing;
+
+ @Override
+ protected synchronized void initialiseInternal() throws Exception
+ {
+ this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+ String lbStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+
+ if (lbStr != null)
+ {
+ this.localBindAddress = InetAddress.getByName(lbStr);
+ }
+ else
+ {
+ this.localBindAddress = null;
+ }
+
+ String gaddr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params);
+ if(gaddr != null)
+ {
+ this.groupAddress = InetAddress.getByName(gaddr);
+ }
+ this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
+ this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+ this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+
+ discoveryGroup = new DiscoveryGroupImpl(getNodeID(),
+ this.discoveryGroupName,
+ this.localBindAddress,
+ this.groupAddress,
+ this.groupPort,
+ this.refreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+ }
+
+ public SimpleUDPServerLocatorImpl(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+ }
+
+ public ClientSessionFactoryInternal connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+ sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ @Override
+ protected void waitInitialDiscovery() throws Exception
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = this.isClusterConnection() ? 0 : this.initialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from cluster");
+ }
+ }
+
+ @Override
+ protected void doCloseInternal()
+ {
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+ }
+}
Added: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A StaticServerLocatorImpl, was derived from ServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public class StaticServerLocatorImpl extends AbstractServerLocator
+{
+ private static final Logger log = Logger.getLogger(StaticServerLocatorImpl.class);
+
+ private StaticConnector staticConnector = new StaticConnector();
+
+ private final Exception e = new Exception();
+
+ @Override
+ protected synchronized void initialiseInternal() throws Exception
+ {
+ /* Nothing special for this class */
+ }
+
+ public StaticServerLocatorImpl(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+
+ Map<String,Object> params = discoveryGroupConfiguration.getParams();
+ List<TransportConfiguration> initialConnectors = (List<TransportConfiguration>)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+ setInitialConnectors(initialConnectors.toArray(new TransportConfiguration[0]));
+
+ e.fillInStackTrace();
+ }
+
+ public ClientSessionFactoryInternal connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+
+ sf = (ClientSessionFactoryInternal)staticConnector.connect();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ @Override
+ protected void waitInitialDiscovery()
+ {
+ /* Nothing to do for this class */
+ }
+
+ @Override
+ protected void doCloseInternal()
+ {
+ staticConnector.disconnect();
+ }
+
+ class StaticConnector implements Serializable
+ {
+ private static final long serialVersionUID = 6772279632415242634l;
+
+ private List<Connector> connectors;
+
+ public ClientSessionFactory connect() throws HornetQException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialiseInternal();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactory csf = null;
+
+ createConnectors();
+
+ try
+ {
+
+ int retryNumber = 0;
+ while (csf == null && !isClosed())
+ {
+ retryNumber++;
+ for (Connector conn : connectors)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::Submitting connect towards " + conn);
+ }
+
+ csf = conn.tryConnect();
+
+ if (csf != null)
+ {
+ csf.getConnection().addFailureListener(new FailureListener()
+ {
+ // Case the node where the cluster connection was connected is gone, we need to restart the
+ // connection
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ if (isClusterConnection() && exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ try
+ {
+ StaticServerLocatorImpl.this.start(getExecutor());
+ }
+ catch (Exception e)
+ {
+ // There isn't much to be done if this happens here
+ log.warn(e.getMessage());
+ }
+ }
+ }
+ });
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Returning " + csf +
+ " after " +
+ retryNumber +
+ " retries on StaticConnector " +
+ StaticServerLocatorImpl.this);
+ }
+
+ return csf;
+ }
+ }
+
+ if (getInitialConnectAttempts() >= 0 && retryNumber > getInitialConnectAttempts())
+ {
+ break;
+ }
+
+ if (!isClosed())
+ {
+ Thread.sleep(getRetryInterval());
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+ }
+
+ if (csf == null && !isClosed())
+ {
+ log.warn("Failed to connecto to any static connector, throwing exception now");
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Returning " + csf + " on " + StaticServerLocatorImpl.this);
+ }
+ return csf;
+ }
+
+ private synchronized void createConnectors()
+ {
+ if (connectors != null)
+ {
+ for (Connector conn : connectors)
+ {
+ if (conn != null)
+ {
+ conn.disconnect();
+ }
+ }
+ }
+ connectors = new ArrayList<Connector>();
+ for (TransportConfiguration initialConnector : getStaticTransportConfigurations())
+ {
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(StaticServerLocatorImpl.this,
+ initialConnector,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.disableFinalizeCheck();
+
+ connectors.add(new Connector(initialConnector, factory));
+ }
+ }
+
+ public synchronized void disconnect()
+ {
+ if (connectors != null)
+ {
+ for (Connector connector : connectors)
+ {
+ connector.disconnect();
+ }
+ }
+ }
+
+ public void finalize() throws Throwable
+ {
+ if (!isClosed() && doFinalizeCheck())
+ {
+ log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
+
+ log.warn("The ServerLocator you didn't close was created here:", e);
+
+ if (StaticServerLocatorImpl.finalizeCallback != null)
+ {
+ StaticServerLocatorImpl.finalizeCallback.run();
+ }
+
+ close();
+ }
+
+ super.finalize();
+ }
+
+ class Connector
+ {
+ private TransportConfiguration initialConnector;
+
+ private volatile ClientSessionFactoryInternal factory;
+
+ private boolean interrupted = false;
+
+ private Exception e;
+
+ public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+ {
+ this.initialConnector = initialConnector;
+ this.factory = factory;
+ }
+
+ public ClientSessionFactory tryConnect() throws HornetQException
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::Trying to connect to " + factory);
+ }
+ try
+ {
+ ClientSessionFactoryInternal factoryToUse = factory;
+ if (factoryToUse != null)
+ {
+ factory.connect(1, false);
+ }
+ return factoryToUse;
+ }
+ catch (HornetQException e)
+ {
+ log.debug(this + "::Exception on establish connector initial connection", e);
+ return null;
+ }
+ }
+
+ public void disconnect()
+ {
+ interrupted = true;
+
+ if (factory != null)
+ {
+ factory.causeExit();
+ factory.cleanup();
+ factory = null;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "Connector [initialConnector=" + initialConnector + "]";
+ }
+
+ }
+ }
+}
13 years, 3 months
JBoss hornetq SVN: r11396 - branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-22 11:30:57 -0400 (Thu, 22 Sep 2011)
New Revision: 11396
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-22 01:37:18 UTC (rev 11395)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-22 15:30:57 UTC (rev 11396)
@@ -19,9 +19,12 @@
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.TextMessage;
import junit.framework.Assert;
@@ -1158,24 +1161,18 @@
public void testRedeliveryWithClientAck() throws Exception
{
+ connV11.connect(defUser, defPass);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
+ this.subscribe(connV11, "subId", "client");
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ sendMessage(getName());
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertTrue(frame.getCommand().equals("MESSAGE"));
- sendFrame(frame);
+ connV11.disconnect();
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
@@ -1183,6 +1180,37 @@
Assert.assertTrue(message.getJMSRedelivered());
}
+ public void testSendManyMessages() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message arg0)
+ {
+ latch.countDown();
+ }
+ });
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ for (int i = 1; i <= count; i++)
+ {
+ connV11.sendFrame(frame);
+ }
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ connV11.disconnect();
+ }
+
+
//-----------------private help methods
private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
13 years, 3 months
JBoss hornetq SVN: r11395 - in branches/HORNETQ-316_for_2_2_EAP: examples/common/config and 59 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-09-21 21:37:18 -0400 (Wed, 21 Sep 2011)
New Revision: 11395
Added:
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
Modified:
branches/HORNETQ-316_for_2_2_EAP/
branches/HORNETQ-316_for_2_2_EAP/examples/common/config/hornetq-example-beans.xml
branches/HORNETQ-316_for_2_2_EAP/examples/common/src/org/hornetq/common/example/SpawnedVMSupport.java
branches/HORNETQ-316_for_2_2_EAP/examples/core/microcontainer/
branches/HORNETQ-316_for_2_2_EAP/examples/javaee/common/config/ant.properties
branches/HORNETQ-316_for_2_2_EAP/examples/javaee/ejb-jms-transaction/build.xml
branches/HORNETQ-316_for_2_2_EAP/examples/javaee/jca-remote/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/javaee/jca-remote/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/javaee/mdb-cmt-setrollbackonly/readme.html
branches/HORNETQ-316_for_2_2_EAP/examples/javaee/mdb-cmt-tx-required/readme.html
branches/HORNETQ-316_for_2_2_EAP/examples/jms/browser/server0/hornetq-beans.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/browser/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/browser/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/dead-letter/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/durable-subscription/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/durable-subscription/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/paging/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/paging/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue-requestor/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue-requestor/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/request-reply/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/request-reply/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/temp-queue/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/temp-queue/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example1/
branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example1/src/org/hornetq/jms/example/TopicSelectorExample1.java
branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example2/
branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example2/src/org/hornetq/jms/example/TopicSelectorExample2.java
branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic/server0/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/transactional/server0/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/examples/jms/xa-send/
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/buffers/
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/DiscoveryGroup.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/DiscoveryListener.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/journal/LoaderCallback.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalReaderCallback.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/management/ManagementService.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/utils/UTF8Util.java
branches/HORNETQ-316_for_2_2_EAP/tests/config/ConfigurationTest-defaults.xml
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-beans.xml
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-configuration.xml
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-jms.xml
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-queues.xml
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-users.xml
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/jndi.properties
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AutogroupIdTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/CommitRollbackTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerRoundRobinTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/DeliveryOrderTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessageCounterTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessageHandlerTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessageRateTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerCloseTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/QueueBrowserTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ReceiveTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RoutingTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCloseTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCreateAndDeleteQueueTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCreateConsumerTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCreateProducerTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TransactionalSendTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/management/SecurityManagementTestBase.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
Log:
merge r11373-r11394 from branches/Branch_2_2_EAP
Property changes on: branches/HORNETQ-316_for_2_2_EAP
___________________________________________________________________
Added: svn:mergeinfo
+ /branches/Branch_2_2_EAP:11373-11394
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/common/config/hornetq-example-beans.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/common/src/org/hornetq/common/example/SpawnedVMSupport.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/core/microcontainer
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/javaee/common/config/ant.properties
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/javaee/ejb-jms-transaction/build.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/javaee/jca-remote/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/javaee/jca-remote/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/javaee/mdb-cmt-setrollbackonly/readme.html
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/javaee/mdb-cmt-tx-required/readme.html
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/browser/server0/hornetq-beans.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/browser/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/browser/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/dead-letter/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/durable-subscription/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/durable-subscription/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/paging/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/paging/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue-requestor/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/queue-requestor/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/request-reply/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/request-reply/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/temp-queue/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/temp-queue/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic/server0/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example1
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example1/src/org/hornetq/jms/example/TopicSelectorExample1.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example2
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/topic-selector-example2/src/org/hornetq/jms/example/TopicSelectorExample2.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/transactional/server0/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/examples/jms/xa-send
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -135,6 +135,12 @@
public static final int DUPLICATE_ID_REJECTED = 113;
+ /**
+ * A Session Metadata was set in duplication
+ */
+ public static final int DUPLICATE_METADATA = 114;
+
+
// Native Error codes ----------------------------------------------
/**
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -569,6 +569,15 @@
void addMetaData(String key, String data) throws HornetQException;
/**
+ * Attach any metadata to the session. Throws an exception if there's already a metadata available.
+ * You can use this metadata to ensure that there is no other session with the same meta-data you are passing as an argument.
+ * This is useful to simulate unique client-ids, where you may want to avoid multiple instances of your client application connected.
+ *
+ * @throws HornetQException
+ */
+ void addUniqueMetaData(String key, String data) throws HornetQException;
+
+ /**
* Attach any metadata to the session.
* Sends a Metadata using the older version
* @deprecated Use {@link ClientSession#addMetaData(String, String)}
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/buffers
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -65,6 +65,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -1146,6 +1147,11 @@
}
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
+
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
+ }
public ClientSessionFactoryInternal getSessionFactory()
{
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -593,4 +593,13 @@
return session.getChannel();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ session.addUniqueMetaData(key, data);
+
+ }
+
}
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -165,7 +165,7 @@
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
-
+
private AfterConnectInternalListener afterConnectListener;
private String groupID;
@@ -549,7 +549,7 @@
}
});
}
-
+
public Executor getExecutor()
{
return startExecutor;
@@ -592,7 +592,7 @@
{
return afterConnectListener;
}
-
+
public boolean isClosed()
{
return closed || closing;
@@ -1115,7 +1115,7 @@
{
return identity;
}
-
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1287,15 +1287,17 @@
{
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
-
- if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) && topology.removeMember(eventTime, nodeID))
+
+ if (topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
// Resetting the topology to its original condition as it was brand new
- topologyArray = null;
-
- receivedTopology = false;
+ synchronized (this)
+ {
+ topologyArray = null;
+ receivedTopology = false;
+ }
}
else
{
@@ -1406,7 +1408,7 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
-
+
if (topology != null && topology.getMember(entry.getNodeID()) == null)
{
TopologyMember member = new TopologyMember(entry.getConnector(), null);
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/DiscoveryGroup.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/DiscoveryListener.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -171,13 +171,20 @@
synchronized (waitLock)
{
- waitLock.notify();
+ waitLock.notifyAll();
}
- socket.close();
+ try
+ {
+ socket.close();
+
+ socket = null;
+ }
+ catch (Throwable ignored)
+ {
+ log.warn(ignored.toString(), ignored);
+ }
- socket = null;
-
try
{
thread.interrupt();
@@ -393,7 +400,7 @@
{
received = true;
- waitLock.notify();
+ waitLock.notifyAll();
}
}
}
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/journal/LoaderCallback.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalReaderCallback.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -79,6 +79,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -490,6 +491,23 @@
session.addMetaData(message.getKey(), message.getData());
break;
}
+ case PacketImpl.SESS_UNIQUE_ADD_METADATA:
+ {
+ SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage)packet;
+ if (session.addUniqueMetaData(message.getKey(), message.getData()))
+ {
+ response = new NullResponseMessage();
+ }
+ else
+ {
+ response = new HornetQExceptionMessage(new HornetQException(HornetQException.DUPLICATE_METADATA,
+ "Metadata " + message.getKey() +
+ "=" +
+ message.getData() +
+ " had been set already"));
+ }
+ break;
+ }
}
}
catch (HornetQXAException e)
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -44,6 +44,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -141,6 +142,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -538,6 +540,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case SESS_UNIQUE_ADD_METADATA:
+ {
+ packet = new SessionUniqueAddMetaDataMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -181,13 +181,17 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC = 103;
-
- // HA
public static final byte SESS_ADD_METADATA = 104;
public static final byte SESS_ADD_METADATA2 = 105;
+ public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+
+
+ // HA
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
@@ -200,6 +204,7 @@
public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -30,7 +30,7 @@
private String key;
private String data;
/**
- * It won require confirmation during failover / reconnect
+ * It's not required confirmation during failover / reconnect
*/
private boolean requiresConfirmation = true;
@@ -39,6 +39,11 @@
super(PacketImpl.SESS_ADD_METADATA2);
}
+ protected SessionAddMetaDataMessageV2(byte packetCode)
+ {
+ super(packetCode);
+ }
+
public SessionAddMetaDataMessageV2(String k, String d)
{
this();
@@ -46,6 +51,13 @@
data = d;
}
+ protected SessionAddMetaDataMessageV2(final byte packetCode, String k, String d)
+ {
+ super(packetCode);
+ key = k;
+ data = d;
+ }
+
public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
{
this();
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Copied: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java (from rev 11394, branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java)
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+
+/**
+ * A SessionUniqueAddMetaDataMessageV2
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class SessionUniqueAddMetaDataMessage extends SessionAddMetaDataMessageV2
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionUniqueAddMetaDataMessage()
+ {
+ super(SESS_UNIQUE_ADD_METADATA);
+ }
+
+
+ public SessionUniqueAddMetaDataMessage(String key, String data)
+ {
+ super(SESS_UNIQUE_ADD_METADATA, key, data);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -65,7 +65,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
@@ -95,7 +95,7 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+
private final ClusterManager clusterManager;
private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
@@ -113,7 +113,7 @@
transportConfigs = config.getAcceptorConfigurations();
this.server = server;
-
+
this.clusterManager = clusterManager;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -136,13 +136,13 @@
this.scheduledThreadPool = scheduledThreadPool;
- this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server,
- interceptors));
+ this.protocolMap.put(ProtocolType.CORE,
+ new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
// difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline()
- this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
- interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server,
- interceptors));
+ this.protocolMap.put(ProtocolType.STOMP,
+ new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS,
+ new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
}
// RemotingService implementation -------------------------------
@@ -168,9 +168,9 @@
// This needs to be a different thread pool to the main thread pool especially for OIO where we may need
// to support many hundreds of connections, but the main thread pool must be kept small for better performance
- ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() + "-" + System.identityHashCode(this),
- false,
- tccl);
+ ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() +
+ "-" +
+ System.identityHashCode(this), false, tccl);
threadPool = Executors.newCachedThreadPool(tFactory);
@@ -275,7 +275,6 @@
}
failureCheckAndFlushThread.close();
-
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
@@ -297,7 +296,7 @@
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
-
+
if (log.isTraceEnabled())
{
log.trace("Sending connection.disconnection packet to " + conn);
@@ -398,23 +397,23 @@
{
log.trace("Connection created " + connection);
}
-
+
connections.put(connection.getID(), entry);
if (config.isBackup())
{
serverSideReplicatingConnection = entry.connection;
- }
+ }
}
-
+
public void connectionDestroyed(final Object connectionID)
{
- if (isTrace)
- {
- log.trace("Connection removed " + connectionID + " from server " + this.server, new Exception ("trace"));
- }
-
+ if (isTrace)
+ {
+ log.trace("Connection removed " + connectionID + " from server " + this.server, new Exception("trace"));
+ }
+
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
@@ -459,7 +458,7 @@
// Connections should only fail when TTL is exceeded
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -496,10 +495,10 @@
}
else
{
- if (log.isTraceEnabled())
- {
- log.trace("ConnectionID = " + connectionID + " was already closed, so ignoring packet");
- }
+ if (log.isTraceEnabled())
+ {
+ log.trace("ConnectionID = " + connectionID + " was already closed, so ignoring packet");
+ }
}
}
}
@@ -540,76 +539,85 @@
{
while (!closed)
{
- long now = System.currentTimeMillis();
+ try
+ {
+ long now = System.currentTimeMillis();
- Set<Object> idsToRemove = new HashSet<Object>();
+ Set<Object> idsToRemove = new HashSet<Object>();
- for (ConnectionEntry entry : connections.values())
- {
- RemotingConnection conn = entry.connection;
+ for (ConnectionEntry entry : connections.values())
+ {
+ RemotingConnection conn = entry.connection;
- boolean flush = true;
+ boolean flush = true;
- if (entry.ttl != -1)
- {
- if (now >= entry.lastCheck + entry.ttl)
+ if (entry.ttl != -1)
{
- if (!conn.checkDataReceived())
+ if (now >= entry.lastCheck + entry.ttl)
{
- idsToRemove.add(conn.getID());
+ if (!conn.checkDataReceived())
+ {
+ idsToRemove.add(conn.getID());
- flush = false;
+ flush = false;
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
}
- else
- {
- entry.lastCheck = now;
- }
}
+
+ if (flush)
+ {
+ conn.flush();
+ }
}
- if (flush)
+ for (Object id : idsToRemove)
{
- conn.flush();
+ RemotingConnection conn = removeConnection(id);
+ if (conn != null)
+ {
+ HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Did not receive data from " + conn.getRemoteAddress() +
+ ". It is likely the client has exited or crashed without " +
+ "closing its connection, or the network between the server and client has failed. " +
+ "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
+ "Please check user manual for more information." +
+ " The connection will now be closed.");
+ conn.fail(me);
+ }
}
- }
- for (Object id : idsToRemove)
- {
- RemotingConnection conn = removeConnection(id);
+ synchronized (this)
+ {
+ long toWait = pauseInterval;
- HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from " + conn.getRemoteAddress() +
- ". It is likely the client has exited or crashed without " +
- "closing its connection, or the network between the server and client has failed. " +
- "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
- "Please check user manual for more information." +
- " The connection will now be closed.");
- conn.fail(me);
- }
+ long start = System.currentTimeMillis();
- synchronized (this)
- {
- long toWait = pauseInterval;
-
- long start = System.currentTimeMillis();
-
- while (!closed && toWait > 0)
- {
- try
+ while (!closed && toWait > 0)
{
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
- now = System.currentTimeMillis();
+ now = System.currentTimeMillis();
- toWait -= now - start;
+ toWait -= now - start;
- start = now;
+ start = now;
+ }
}
}
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
}
}
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -132,6 +132,9 @@
List<ServerSession> getSessions(String connectionID);
+ /** will return true if there is any session wth this key */
+ boolean lookupSession(String metakey, String metavalue);
+
ClusterManager getClusterManager();
SimpleString getNodeID();
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -121,6 +121,8 @@
void addMetaData(String key, String data);
+ boolean addUniqueMetaData(String key, String data);
+
String getMetaData(String key);
String[] getTargetAddresses();
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -797,6 +797,23 @@
sessions.remove(name);
}
+ public boolean lookupSession(String key, String value)
+ {
+ // getSessions is called here in a try to minimize locking the Server while this check is being done
+ Set<ServerSession> allSessions = getSessions();
+
+ for (ServerSession session : allSessions)
+ {
+ String metaValue = session.getMetaData(key);
+ if (metaValue != null && metaValue.equals(value))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -1194,6 +1194,21 @@
metaData.put(key, data);
}
+
+ public boolean addUniqueMetaData(String key, String data)
+ {
+ if (server.lookupSession(key, data))
+ {
+ // There is a duplication of this property
+ return false;
+ }
+ else
+ {
+ addMetaData(key, data);
+ return true;
+ }
+ }
+
public String getMetaData(String key)
{
String data = null;
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/management/ManagementService.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -181,6 +181,18 @@
{
throw new IllegalStateException("setClientID can only be called directly after the connection is created");
}
+
+ try
+ {
+ initialSession.addUniqueMetaData("jms-client-id", clientID);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.DUPLICATE_METADATA)
+ {
+ throw new IllegalStateException("clientID=" + clientID + " was already set into another connection");
+ }
+ }
this.clientID = clientID;
try
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/utils/UTF8Util.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/config/ConfigurationTest-defaults.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-beans.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-configuration.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-jms.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-queues.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/hornetq-users.xml
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/config/jndi.properties
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -88,8 +88,21 @@
connection.setClientID(clientID);
ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
+
+ Connection connection2 = JMSTest.cf.createConnection();
+ try
+ {
+ connection2.setClientID(clientID);
+ fail("setClientID was expected to throw an exception");
+ }
+ catch (JMSException e)
+ {
+ // expected
+ }
connection.close();
+
+ connection2.setClientID(clientID);
}
public void testSetClientAfterStart() throws Exception
Modified: branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -1417,6 +1417,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AutogroupIdTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/CommitRollbackTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerRoundRobinTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/DeliveryOrderTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessageCounterTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessageHandlerTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessageRateTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ProducerCloseTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/QueueBrowserTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ReceiveTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RoutingTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCloseTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCreateAndDeleteQueueTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCreateConsumerTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionCreateProducerTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TransactionalSendTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Modified: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -106,7 +106,7 @@
}
- private static final long TIMEOUT_START_SERVER = 10;
+ private static final long TIMEOUT_START_SERVER = 400;
@Override
protected void setUp() throws Exception
@@ -1666,8 +1666,8 @@
null,
groupAddress,
port,
- 5000,
- 5000);
+ 1000,
+ 1000);
configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
Modified: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -420,7 +420,7 @@
locator.close();
}
-
+
public void testMultipleClientSessionFactories() throws Throwable
{
startServers(0, 1, 2, 3, 4);
Modified: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -98,7 +98,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.getSubscriptionCount());
@@ -118,7 +118,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"_2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -145,7 +145,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
@@ -171,7 +171,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
String jsonString = topicControl.listDurableSubscriptionsAsJSON();
@@ -179,7 +179,7 @@
Assert.assertEquals(2, infos.length);
Assert.assertEquals(clientID, infos[0].getClientID());
Assert.assertEquals(subscriptionName, infos[0].getName());
- Assert.assertEquals(clientID, infos[1].getClientID());
+ Assert.assertEquals(clientID+"2", infos[1].getClientID());
Assert.assertEquals(subscriptionName + "2", infos[1].getName());
jsonString = topicControl.listNonDurableSubscriptionsAsJSON();
@@ -344,7 +344,7 @@
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID+"2", subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -438,7 +438,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -460,7 +460,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
TopicControl topicControl = createManagementControl();
Modified: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2011-09-21 19:59:57 UTC (rev 11394)
+++ branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2011-09-22 01:37:18 UTC (rev 11395)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.jms.server.management;
-import static junit.framework.Assert.assertEquals;
import static org.hornetq.tests.util.RandomUtil.randomString;
import javax.jms.Connection;
@@ -30,9 +29,7 @@
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
@@ -96,7 +93,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(3, proxy.retrieveAttributeValue("subscriptionCount"));
Assert.assertEquals(1, proxy.retrieveAttributeValue("nonDurableSubscriptionCount"));
@@ -115,7 +112,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(0, proxy.retrieveAttributeValue("messageCount"));
Assert.assertEquals(0, proxy.retrieveAttributeValue("nonDurableMessageCount"));
@@ -140,7 +137,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(3, ((Object[])proxy.invokeOperation("listAllSubscriptions")).length);
Assert.assertEquals(1, ((Object[])proxy.invokeOperation("listNonDurableSubscriptions")).length);
@@ -254,7 +251,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber durableSubscriber_2 = JMSUtil.createDurableSubscriber(connection_2,
topic,
- clientID,
+ clientID + "2",
subscriptionName + "2");
Assert.assertEquals(2, proxy.retrieveAttributeValue("subscriptionCount"));
@@ -276,7 +273,7 @@
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID + "2", subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -344,7 +341,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
@@ -370,7 +367,7 @@
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3,
topic,
- clientID,
+ clientID + "2",
subscriptionName + "2",
Session.CLIENT_ACKNOWLEDGE);
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/integration/management/SecurityManagementTestBase.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
Property changes on: branches/HORNETQ-316_for_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
-
13 years, 3 months
JBoss hornetq SVN: r11394 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:59:57 -0400 (Wed, 21 Sep 2011)
New Revision: 11394
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
Log:
fixing a test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2011-09-21 19:44:31 UTC (rev 11393)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2011-09-21 19:59:57 UTC (rev 11394)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.jms.server.management;
-import static junit.framework.Assert.assertEquals;
import static org.hornetq.tests.util.RandomUtil.randomString;
import javax.jms.Connection;
@@ -30,9 +29,7 @@
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
@@ -96,7 +93,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(3, proxy.retrieveAttributeValue("subscriptionCount"));
Assert.assertEquals(1, proxy.retrieveAttributeValue("nonDurableSubscriptionCount"));
@@ -115,7 +112,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(0, proxy.retrieveAttributeValue("messageCount"));
Assert.assertEquals(0, proxy.retrieveAttributeValue("nonDurableMessageCount"));
@@ -140,7 +137,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(3, ((Object[])proxy.invokeOperation("listAllSubscriptions")).length);
Assert.assertEquals(1, ((Object[])proxy.invokeOperation("listNonDurableSubscriptions")).length);
@@ -254,7 +251,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber durableSubscriber_2 = JMSUtil.createDurableSubscriber(connection_2,
topic,
- clientID,
+ clientID + "2",
subscriptionName + "2");
Assert.assertEquals(2, proxy.retrieveAttributeValue("subscriptionCount"));
@@ -276,7 +273,7 @@
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID + "2", subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -344,7 +341,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
@@ -370,7 +367,7 @@
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3,
topic,
- clientID,
+ clientID + "2",
subscriptionName + "2",
Session.CLIENT_ACKNOWLEDGE);
13 years, 3 months
JBoss hornetq SVN: r11393 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:44:31 -0400 (Wed, 21 Sep 2011)
New Revision: 11393
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/config/common/hornetq-version.properties
Log:
changing version
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/config/common/hornetq-version.properties 2011-09-21 19:43:20 UTC (rev 11392)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/config/common/hornetq-version.properties 2011-09-21 19:44:31 UTC (rev 11393)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-6871
+hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7242
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=5
13 years, 3 months
JBoss hornetq SVN: r11392 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:43:20 -0400 (Wed, 21 Sep 2011)
New Revision: 11392
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
Log:
JBPAPP-7242 - fixing test
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2011-09-21 19:40:59 UTC (rev 11391)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2011-09-21 19:43:20 UTC (rev 11392)
@@ -96,7 +96,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(3, proxy.retrieveAttributeValue("subscriptionCount"));
Assert.assertEquals(1, proxy.retrieveAttributeValue("nonDurableSubscriptionCount"));
@@ -115,7 +115,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(0, proxy.retrieveAttributeValue("messageCount"));
Assert.assertEquals(0, proxy.retrieveAttributeValue("nonDurableMessageCount"));
@@ -140,7 +140,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
Assert.assertEquals(3, ((Object[])proxy.invokeOperation("listAllSubscriptions")).length);
Assert.assertEquals(1, ((Object[])proxy.invokeOperation("listNonDurableSubscriptions")).length);
@@ -254,7 +254,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber durableSubscriber_2 = JMSUtil.createDurableSubscriber(connection_2,
topic,
- clientID,
+ clientID + "2",
subscriptionName + "2");
Assert.assertEquals(2, proxy.retrieveAttributeValue("subscriptionCount"));
@@ -276,7 +276,7 @@
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID + "2", subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -344,7 +344,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
@@ -370,7 +370,7 @@
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3,
topic,
- clientID,
+ clientID + "2",
subscriptionName + "2",
Session.CLIENT_ACKNOWLEDGE);
13 years, 3 months
JBoss hornetq SVN: r11391 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:40:59 -0400 (Wed, 21 Sep 2011)
New Revision: 11391
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
changing a warn message
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21 19:40:59 UTC (rev 11391)
@@ -1049,7 +1049,10 @@
if (consumer == null)
{
- ServerSessionImpl.log.error("There is no consumer with id " + consumerID);
+ if (log.isDebugEnabled())
+ {
+ ServerSessionImpl.log.debug("There is no consumer with id " + consumerID);
+ }
return;
}
13 years, 3 months