JBoss hornetq SVN: r11604 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-26 08:14:51 -0400 (Wed, 26 Oct 2011)
New Revision: 11604
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Avoid locking the locator to close it, (ii) track factories at static connector with connectingFactory,
(iii) add 'state' field and remove all other fields used to track state.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 12:05:13 UTC (rev 11603)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 12:14:51 UTC (rev 11604)
@@ -59,6 +59,11 @@
{
private static final long serialVersionUID = -1615857864410205260L;
+ private enum STATE
+ {
+ INITIALIZED, CLOSING, CLOSED,
+ };
+
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
private final boolean ha;
@@ -97,8 +102,6 @@
private ConnectionLoadBalancingPolicy loadBalancingPolicy;
- private boolean readOnly;
-
// Settable attributes:
private boolean cacheLargeMessagesClient;
@@ -155,10 +158,8 @@
private int initialMessagePacketSize;
- private volatile boolean closed;
+ private volatile STATE state;
- private volatile boolean closing;
-
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
@@ -332,10 +333,14 @@
private synchronized void initialise() throws HornetQException
{
- if (readOnly)
+ if (state == STATE.INITIALIZED)
{
return;
}
+ if (state == STATE.CLOSING)
+ {
+ throw new IllegalStateException("Cannot initialize 'closing' locator");
+ }
try
{
setThreadPools();
@@ -369,7 +374,7 @@
discoveryGroup.start();
}
- readOnly = true;
+ state = STATE.INITIALIZED;
}
catch (Exception e)
{
@@ -550,7 +555,7 @@
}
catch (Exception e)
{
- if (!closing)
+ if (!isClosed())
{
log.warn("did not connect the cluster connection to other nodes", e);
}
@@ -600,21 +605,20 @@
public boolean isClosed()
{
- return closed || closing;
+ return state == STATE.CLOSED || state == STATE.CLOSING;
}
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
assertOpen();
- initialise();
+ initialise();
synchronized (this)
{
assertOpen();
ClientSessionFactoryInternal factory =
- new ClientSessionFactoryImpl(this,
- transportConfiguration,
+ new ClientSessionFactoryImpl(this, transportConfiguration,
callTimeout,
clientFailureCheckPeriod,
connectionTTL,
@@ -625,17 +629,36 @@
threadPool,
scheduledThreadPool,
interceptors);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addFactory(factory);
+ return factory;
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
+ }
+ }
+
+ private void removeFromConnecting(ClientSessionFactoryInternal factory)
+ {
+ connectingFactories.remove(factory);
+ }
+
+ private void addToConnecting(ClientSessionFactoryInternal factory)
+ {
+ synchronized (connectingFactories)
+ {
+ assertOpen();
connectingFactories.add(factory);
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
- connectingFactories.remove(factory);
- addFactory(factory);
- return factory;
}
}
-
private void assertOpen()
{
- if (closed || closing)
+ if (state != null && state != STATE.INITIALIZED)
{
throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
}
@@ -677,27 +700,24 @@
try
{
- factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- connectingFactories.add(factory);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
- connectingFactories.remove(factory);
+ factory =
+ new ClientSessionFactoryImpl(this, tc, callTimeout, clientFailureCheckPeriod, connectionTTL,
+ retryInterval, retryIntervalMultiplier, maxRetryInterval,
+ reconnectAttempts, threadPool, scheduledThreadPool, interceptors);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
}
catch (HornetQException e)
{
if (factory != null)
{
- connectingFactories.remove(factory);
factory.close();
}
factory = null;
@@ -728,9 +748,7 @@
if (ha || clusterConnection)
{
long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
- !receivedTopology &&
- timeout > System.currentTimeMillis())
+ while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis())
{
// Now wait for the topology
@@ -744,7 +762,7 @@
}
- if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+ if (System.currentTimeMillis() > timeout && !receivedTopology)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
@@ -755,7 +773,6 @@
return factory;
}
-
}
public boolean isHA()
@@ -1108,7 +1125,7 @@
private void checkWrite()
{
- if (readOnly)
+ if (state == STATE.INITIALIZED)
{
throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
}
@@ -1187,7 +1204,7 @@
protected void doClose(final boolean sendClose)
{
- if (closed)
+ if (state == STATE.CLOSED)
{
if (log.isDebugEnabled())
{
@@ -1201,7 +1218,7 @@
log.debug(this + " is calling close", new Exception("trace"));
}
- closing = true;
+ state = STATE.CLOSING;
if (discoveryGroup != null)
{
@@ -1219,28 +1236,33 @@
staticConnector.disconnect();
}
- for (ClientSessionFactoryInternal factory : connectingFactories)
+ synchronized (connectingFactories)
{
- factory.causeExit();
- factory.close();
+ for (ClientSessionFactoryInternal factory : connectingFactories)
+ {
+ factory.causeExit();
+ factory.close();
+ }
+ connectingFactories.clear();
}
- synchronized (this)
+
+ synchronized (factories)
{
- Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
+ Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
- for (ClientSessionFactory factory : clonedFactory)
- {
- if (sendClose)
+ for (ClientSessionFactory factory : clonedFactory)
{
- factory.close();
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
}
- else
- {
- factory.cleanup();
- }
- }
- factories.clear();
+ factories.clear();
}
if (shutdownPool)
@@ -1277,9 +1299,7 @@
}
}
}
- readOnly = false;
-
- closed = true;
+ state = STATE.CLOSED;
}
/** This is directly called when the connection to the node is gone,
@@ -1370,9 +1390,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
@@ -1385,21 +1402,19 @@
discoveryGroupConfiguration +
"]";
}
- else
- {
- return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+ return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
", discoveryGroupConfiguration=" +
discoveryGroupConfiguration +
"]";
- }
}
private synchronized void updateArraysAndPairs()
{
+ assertOpen();
Collection<TopologyMember> membersCopy = topology.getMembers();
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
- membersCopy.size());
+ topologyArray =
+ (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class, membersCopy.size());
int count = 0;
for (TopologyMember pair : membersCopy)
@@ -1472,20 +1487,22 @@
topology.removeClusterTopologyListener(listener);
}
- public synchronized void addFactory(ClientSessionFactoryInternal factory)
+ private synchronized void addFactory(ClientSessionFactoryInternal factory)
{
if (factory == null)
{
return;
}
- if (closed || closing)
+ synchronized (factories)
{
- factory.close();
- return;
- }
+ if (isClosed())
+ {
+ factory.close();
+ return;
+ }
- TransportConfiguration backup = null;
+ TransportConfiguration backup = null;
if (topology != null)
{
@@ -1494,7 +1511,7 @@
factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
factories.add(factory);
-
+ }
}
final class StaticConnector implements Serializable
@@ -1505,7 +1522,7 @@
public ClientSessionFactory connect() throws HornetQException
{
- if (closed || closing)
+ if (isClosed())
{
throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
}
@@ -1520,7 +1537,7 @@
{
int retryNumber = 0;
- while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
+ while (csf == null && isClosed())
{
retryNumber++;
for (Connector conn : connectors)
@@ -1573,7 +1590,7 @@
break;
}
- if (!closed && !closing)
+ if (!isClosed())
{
Thread.sleep(retryInterval);
}
@@ -1586,7 +1603,7 @@
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
}
- if (csf == null && !closed)
+ if (csf == null && !isClosed())
{
log.warn("Failed to connecto to any static connector, throwing exception now");
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
@@ -1647,7 +1664,7 @@
@Override
public void finalize() throws Throwable
{
- if (!closed && finalizeCheck)
+ if (state != STATE.CLOSED && finalizeCheck)
{
log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
System.identityHashCode(this));
@@ -1692,7 +1709,16 @@
ClientSessionFactoryInternal factoryToUse = factory;
if (factoryToUse != null)
{
- factory.connect(1, false);
+ addToConnecting(factoryToUse);
+
+ try
+ {
+ factoryToUse.connect(1, false);
+ }
+ finally
+ {
+ removeFromConnecting(factoryToUse);
+ }
}
return factoryToUse;
}
@@ -1715,9 +1741,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
13 years, 4 months
JBoss hornetq SVN: r11603 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-26 08:05:13 -0400 (Wed, 26 Oct 2011)
New Revision: 11603
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Avoid locking the locator inside close() as that can also cause dead-locks.
The "synchronized" in createFactory() should be revisited and probably deleted.
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 11:06:55 UTC (rev 11602)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 12:05:13 UTC (rev 11603)
@@ -599,7 +599,7 @@
return afterConnectListener;
}
- public synchronized ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
assertOpen();
@@ -1222,30 +1222,30 @@
{
for (ClientSessionFactoryInternal csf : connectingFactories)
{
- csf.causeExit();
- csf.close();
+ csf.close();
}
connectingFactories.clear();
}
- synchronized (this)
+ synchronized (factories)
{
Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
- for (ClientSessionFactory factory : clonedFactory)
- {
- if (sendClose)
+ for (ClientSessionFactory factory : clonedFactory)
{
- factory.close();
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
}
- else
- {
- factory.cleanup();
- }
+
+ factories.clear();
}
- factories.clear();
-
if (shutdownPool)
{
if (threadPool != null)
@@ -1282,8 +1282,8 @@
}
readOnly = false;
- state = STATE.CLOSED;
- }
+ state = STATE.CLOSED;
+
}
/** This is directly called when the connection to the node is gone,
@@ -1477,28 +1477,31 @@
topology.removeClusterTopologyListener(listener);
}
- public synchronized void addFactory(ClientSessionFactoryInternal factory)
+ private synchronized void addFactory(ClientSessionFactoryInternal factory)
{
if (factory == null)
{
return;
}
- if (isClosed())
+ synchronized (factories)
{
- factory.close();
- return;
- }
+ if (isClosed())
+ {
+ factory.close();
+ return;
+ }
- TransportConfiguration backup = null;
+ TransportConfiguration backup = null;
- if (topology != null)
- {
- backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ if (topology != null)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ }
+
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
}
-
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
}
class StaticConnector implements Serializable
13 years, 4 months
JBoss hornetq SVN: r11602 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-26 07:06:55 -0400 (Wed, 26 Oct 2011)
New Revision: 11602
Modified:
trunk/pom.xml
Log:
Run the concurrent tests on jenkins.
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-10-26 10:48:55 UTC (rev 11601)
+++ trunk/pom.xml 2011-10-26 11:06:55 UTC (rev 11602)
@@ -392,7 +392,7 @@
<skipJoramTests>false</skipJoramTests>
<skipIntegrationTests>false</skipIntegrationTests>
<skipTimingTests>true</skipTimingTests>
- <skipConcurrentTests>true</skipConcurrentTests>
+ <skipConcurrentTests>false</skipConcurrentTests>
<skipStressTests>true</skipStressTests>
<skipSoakTests>true</skipSoakTests>
<skipPerformanceTests>true</skipPerformanceTests>
13 years, 4 months
JBoss hornetq SVN: r11601 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-26 06:48:55 -0400 (Wed, 26 Oct 2011)
New Revision: 11601
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fix incorrect check for isClosed()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 10:25:21 UTC (rev 11600)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 10:48:55 UTC (rev 11601)
@@ -747,7 +747,7 @@
}
- if (System.currentTimeMillis() > timeout && !receivedTopology && !isClosed())
+ if (System.currentTimeMillis() > timeout && !receivedTopology)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
@@ -1486,7 +1486,6 @@
if (isClosed())
{
- factory.causeExit();
factory.close();
return;
}
13 years, 4 months
JBoss hornetq SVN: r11600 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-26 06:25:21 -0400 (Wed, 26 Oct 2011)
New Revision: 11600
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fix race & dead-locking between ServerLocatorImpl.close() and the creation of new SessionFactories.
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 10:25:06 UTC (rev 11599)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 10:25:21 UTC (rev 11600)
@@ -57,6 +57,8 @@
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
+ private enum STATE{ INITIALIZED, CLOSED, CLOSING};
+
private static final long serialVersionUID = -1615857864410205260L;
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
@@ -69,7 +71,8 @@
private transient String identity;
- private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+ private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+ private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
private TransportConfiguration[] initialConnectors;
@@ -154,10 +157,8 @@
private int initialMessagePacketSize;
- private volatile boolean closed;
+ private volatile STATE state;
- private volatile boolean closing;
-
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
@@ -252,7 +253,7 @@
return globalScheduledThreadPool;
}
- private void setThreadPools()
+ private synchronized void setThreadPools()
{
if (threadPool != null)
{
@@ -329,10 +330,16 @@
});
}
- private synchronized void initialise() throws Exception
+ private synchronized void initialise() throws HornetQException
{
- if (!readOnly)
+ if (readOnly)
{
+ return;
+ }
+
+ try
+ {
+ state = STATE.INITIALIZED;
setThreadPools();
instantiateLoadBalancingPolicy();
@@ -366,6 +373,11 @@
readOnly = true;
}
+ catch (Exception e)
+ {
+ state = null;
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
}
private ServerLocatorImpl(final Topology topology,
@@ -515,14 +527,12 @@
return pair.getA();
}
- else
- {
- // Get from initialconnectors
+
+ // Get from initialconnectors
- int pos = loadBalancingPolicy.select(initialConnectors.length);
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
- return initialConnectors[pos];
- }
+ return initialConnectors[pos];
}
public void start(Executor executor) throws Exception
@@ -541,7 +551,7 @@
}
catch (Exception e)
{
- if (!closing)
+ if (!isClosed())
{
log.warn("did not connect the cluster connection to other nodes", e);
}
@@ -565,19 +575,15 @@
public ClientSessionFactoryInternal connect() throws Exception
{
- ClientSessionFactoryInternal sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
}
// wait for discovery group to get the list of initial connectors
- else
- {
- sf = (ClientSessionFactoryInternal)createSessionFactory();
- }
- addFactory(sf);
- return sf;
+ return (ClientSessionFactoryInternal)createSessionFactory();
}
/* (non-Javadoc)
@@ -593,27 +599,12 @@
return afterConnectListener;
}
- public boolean isClosed()
+ public synchronized ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
- return closed || closing;
- }
+ assertOpen();
- public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
+ initialise();
+
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
transportConfiguration,
callTimeout,
@@ -627,29 +618,39 @@
scheduledThreadPool,
interceptors);
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addFactory(factory);
+ return factory;
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
+ }
- addFactory(factory);
-
- return factory;
+ private void removeFromConnecting(ClientSessionFactoryInternal factory)
+ {
+ connectingFactories.remove(factory);
}
- public ClientSessionFactory createSessionFactory() throws Exception
+ private void addToConnecting(ClientSessionFactoryInternal factory)
{
- if (closed || closing)
+ synchronized (connectingFactories)
{
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ assertOpen();
+ connectingFactories.add(factory);
}
+ }
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ assertOpen();
+ initialise();
+
if (initialConnectors == null && discoveryGroup != null)
{
// Wait for an initial broadcast to give us at least one node in the cluster
@@ -691,7 +692,15 @@
threadPool,
scheduledThreadPool,
interceptors);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ try
+ {
+ addToConnecting(factory);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
}
catch (HornetQException e)
{
@@ -723,10 +732,8 @@
if (ha || clusterConnection)
{
- long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
- !receivedTopology &&
- timeout > System.currentTimeMillis())
+ final long timeout = System.currentTimeMillis() + 30000;
+ while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis())
{
// Now wait for the topology
@@ -740,7 +747,7 @@
}
- if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+ if (System.currentTimeMillis() > timeout && !receivedTopology && !isClosed())
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
@@ -1184,7 +1191,7 @@
protected void doClose(final boolean sendClose)
{
- if (closed)
+ if (state == STATE.CLOSED)
{
if (log.isDebugEnabled())
{
@@ -1193,13 +1200,8 @@
return;
}
- if (log.isDebugEnabled())
- {
- log.debug(this + " is calling close", new Exception("trace"));
- }
+ state = STATE.CLOSING;
- closing = true;
-
if (discoveryGroup != null)
{
try
@@ -1215,9 +1217,21 @@
{
staticConnector.disconnect();
}
+
+ synchronized (connectingFactories)
+ {
+ for (ClientSessionFactoryInternal csf : connectingFactories)
+ {
+ csf.causeExit();
+ csf.close();
+ }
+ connectingFactories.clear();
+ }
+
+ synchronized (this)
+ {
+ Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
- Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
-
for (ClientSessionFactory factory : clonedFactory)
{
if (sendClose)
@@ -1268,7 +1282,8 @@
}
readOnly = false;
- closed = true;
+ state = STATE.CLOSED;
+ }
}
/** This is directly called when the connection to the node is gone,
@@ -1378,13 +1393,10 @@
discoveryGroupConfiguration +
"]";
}
- else
- {
- return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
- }
+ return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
}
private synchronized void updateArraysAndPairs()
@@ -1467,18 +1479,27 @@
public synchronized void addFactory(ClientSessionFactoryInternal factory)
{
- if (factory != null)
+ if (factory == null)
{
- TransportConfiguration backup = null;
+ return;
+ }
- if (topology != null)
- {
- backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- }
+ if (isClosed())
+ {
+ factory.causeExit();
+ factory.close();
+ return;
+ }
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
+ TransportConfiguration backup = null;
+
+ if (topology != null)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
}
+
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
}
class StaticConnector implements Serializable
@@ -1489,19 +1510,9 @@
public ClientSessionFactory connect() throws HornetQException
{
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
+ assertOpen();
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
+ initialise();
ClientSessionFactory csf = null;
@@ -1511,7 +1522,7 @@
{
int retryNumber = 0;
- while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
+ while (csf == null && !isClosed())
{
retryNumber++;
for (Connector conn : connectors)
@@ -1564,7 +1575,7 @@
break;
}
- if (!closed && !closing)
+ if (!isClosed())
{
Thread.sleep(retryInterval);
}
@@ -1577,7 +1588,7 @@
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
}
- if (csf == null && !closed)
+ if (csf == null && !isClosed())
{
log.warn("Failed to connecto to any static connector, throwing exception now");
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
@@ -1636,7 +1647,7 @@
public void finalize() throws Throwable
{
- if (!closed && finalizeCheck)
+ if (!isClosed() && finalizeCheck)
{
log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
System.identityHashCode(this));
@@ -1681,7 +1692,15 @@
ClientSessionFactoryInternal factoryToUse = factory;
if (factoryToUse != null)
{
- factory.connect(1, false);
+ try
+ {
+ addToConnecting(factoryToUse);
+ factoryToUse.connect(1, false);
+ }
+ finally
+ {
+ removeFromConnecting(factoryToUse);
+ }
}
return factoryToUse;
}
@@ -1704,9 +1723,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
@@ -1715,4 +1731,17 @@
}
}
+
+ private void assertOpen()
+ {
+ if (state != null && state != STATE.INITIALIZED)
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return state != STATE.INITIALIZED;
+ }
}
13 years, 4 months
JBoss hornetq SVN: r11599 - branches/Branch_2_2_EAP.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-26 06:25:06 -0400 (Wed, 26 Oct 2011)
New Revision: 11599
Modified:
branches/Branch_2_2_EAP/.gitignore
Log:
improve gitignore
Modified: branches/Branch_2_2_EAP/.gitignore
===================================================================
--- branches/Branch_2_2_EAP/.gitignore 2011-10-26 09:04:08 UTC (rev 11598)
+++ branches/Branch_2_2_EAP/.gitignore 2011-10-26 10:25:06 UTC (rev 11599)
@@ -7,4 +7,7 @@
/data
/junit*.properties
/target
-.gitignore
\ No newline at end of file
+.gitignore
+tests/build
+hornetq-rest/hornetq-rest/target
+tests/*/build/classes
\ No newline at end of file
13 years, 4 months
JBoss hornetq SVN: r11598 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-10-26 05:04:08 -0400 (Wed, 26 Oct 2011)
New Revision: 11598
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
Log:
fixed RA tests
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java 2011-10-26 02:53:10 UTC (rev 11597)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java 2011-10-26 09:04:08 UTC (rev 11598)
@@ -22,6 +22,7 @@
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.tests.util.UnitTestCase;
import javax.resource.ResourceException;
import java.util.HashSet;
@@ -44,6 +45,7 @@
public void testSimpleMessageReceivedOnQueueWithSecurityFails() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -74,6 +76,7 @@
roles.add(role);
server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2011-10-26 02:53:10 UTC (rev 11597)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2011-10-26 09:04:08 UTC (rev 11598)
@@ -20,6 +20,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.tests.util.UnitTestCase;
import javax.jms.Message;
import java.util.concurrent.CountDownLatch;
@@ -41,6 +42,7 @@
public void testSimpleMessageReceivedOnQueue() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -72,6 +74,7 @@
public void testInvalidAckMode() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -90,6 +93,7 @@
public void testSimpleMessageReceivedOnQueueInLocalTX() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
qResourceAdapter.setUseLocalTx(true);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
@@ -126,6 +130,7 @@
public void testSimpleMessageReceivedOnQueueWithSelector() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -162,6 +167,7 @@
public void testEndpointDeactivated() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -185,6 +191,7 @@
public void testMaxSessions() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -207,6 +214,7 @@
public void testSimpleTopic() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -237,6 +245,7 @@
public void testDurableSubscription() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -293,6 +302,7 @@
public void testNonDurableSubscription() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -340,6 +350,7 @@
public void testSelectorChangedWithTopic() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -396,6 +407,7 @@
public void testSelectorNotChangedWithTopic() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java 2011-10-26 02:53:10 UTC (rev 11597)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java 2011-10-26 09:04:08 UTC (rev 11598)
@@ -18,6 +18,7 @@
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.UUIDGenerator;
import javax.resource.ResourceException;
@@ -43,6 +44,7 @@
public void testXACommit() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
@@ -74,6 +76,7 @@
public void testXARollback() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java 2011-10-26 02:53:10 UTC (rev 11597)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java 2011-10-26 09:04:08 UTC (rev 11598)
@@ -16,6 +16,7 @@
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.tests.util.UnitTestCase;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
@@ -31,6 +32,7 @@
public void testStartStop() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
HornetQRATestBase.MyBootstrapContext ctx = new HornetQRATestBase.MyBootstrapContext();
qResourceAdapter.setTransactionManagerLocatorClass("");
13 years, 4 months
JBoss hornetq SVN: r11597 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-10-25 22:53:10 -0400 (Tue, 25 Oct 2011)
New Revision: 11597
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/PrintData.java
Log:
Adding debug to print-data
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-25 21:50:31 UTC (rev 11596)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-26 02:53:10 UTC (rev 11597)
@@ -1985,7 +1985,7 @@
}
}
- private OperationContext getContext(final boolean sync)
+ protected OperationContext getContext(final boolean sync)
{
if (sync)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/PrintData.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/PrintData.java 2011-10-25 21:50:31 UTC (rev 11596)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/PrintData.java 2011-10-26 02:53:10 UTC (rev 11597)
@@ -13,6 +13,9 @@
package org.hornetq.core.persistence.impl.journal;
+import java.io.File;
+
+import org.hornetq.core.server.impl.FileLockNodeManager;
/**
* A PrintData
*
@@ -42,6 +45,25 @@
System.exit(-1);
}
+ File serverLockFile = new File(arg[1], "server.lock");
+
+ if (serverLockFile.isFile())
+ {
+ try
+ {
+ FileLockNodeManager fileLock = new FileLockNodeManager(arg[1]);
+ fileLock.start();
+ System.out.println("********************************************");
+ System.out.println("Server's ID=" + fileLock.getNodeId().toString());
+ System.out.println("********************************************");
+ fileLock.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
System.out.println("********************************************");
System.out.println("B I N D I N G S J O U R N A L");
System.out.println("********************************************");
13 years, 4 months
JBoss hornetq SVN: r11596 - in branches/Branch_2_2_EAP/src/main/org/hornetq: jms/client and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-10-25 17:50:31 -0400 (Tue, 25 Oct 2011)
New Revision: 11596
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
Just adding some logging to debug the resource adapter's recovery
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-10-25 18:44:58 UTC (rev 11595)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-10-25 21:50:31 UTC (rev 11596)
@@ -885,9 +885,8 @@
}
// Should never get here
- throw new IllegalStateException("Internal Error! ClientSessionFactoryImpl::createSessionInternal " +
- "just reached a condition that was not supposed to happen. " +
- "Please inform this condition to the HornetQ team");
+ throw new IllegalStateException("Internal Error! ClientSessionFactoryImpl::createSessionInternal " + "just reached a condition that was not supposed to happen. "
+ + "Please inform this condition to the HornetQ team");
}
private void callFailureListeners(final HornetQException me, final boolean afterReconnect, final boolean failedOver)
@@ -1293,7 +1292,9 @@
ClientSessionFactoryImpl.log.trace(this + "::Subscribing Topology");
}
- channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(), VersionLoader.getVersion().getIncrementingVersion()));
+ channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(),
+ VersionLoader.getVersion()
+ .getIncrementingVersion()));
}
}
@@ -1314,13 +1315,16 @@
/**
* @param channel0
*/
- public void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig)
+ public void sendNodeAnnounce(final long currentEventID,
+ String nodeID,
+ boolean isBackup,
+ TransportConfiguration config,
+ TransportConfiguration backupConfig)
{
Channel channel0 = connection.getChannel(0, -1);
if (ClientSessionFactoryImpl.isDebug)
{
- ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
- ", isBackup=" + isBackup);
+ ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() + ", isBackup=" + isBackup);
}
channel0.send(new NodeAnnounceMessage(currentEventID, nodeID, isBackup, config, backupConfig));
}
@@ -1489,7 +1493,10 @@
" csf created at\nserverLocator=" +
serverLocator, e);
}
- serverLocator.notifyNodeUp(System.currentTimeMillis(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUp(System.currentTimeMillis(),
+ topMessage.getNodeID(),
+ topMessage.getPair(),
+ topMessage.isLast());
}
}
else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
@@ -1516,7 +1523,10 @@
" csf created at\nserverLocator=" +
serverLocator, e);
}
- serverLocator.notifyNodeUp(topMessage.getUniqueEventID(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getUniqueEventID(),
+ topMessage.getNodeID(),
+ topMessage.getPair(),
+ topMessage.isLast());
}
}
}
@@ -1642,6 +1652,20 @@
}
/* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "ClientSessionFactoryImpl [serverLocator=" + serverLocator +
+ ", connectorConfig=" +
+ connectorConfig +
+ ", backupConfig=" +
+ backupConfig +
+ "]";
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
*/
public void setReconnectAttempts(final int attempts)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-10-25 18:44:58 UTC (rev 11595)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-10-25 21:50:31 UTC (rev 11596)
@@ -1687,7 +1687,8 @@
username +
", closed=" +
closed +
- " metaData=(" +
+ ", factory = " + this.sessionFactory +
+ ", metaData=(" +
buffer +
")]@" +
Integer.toHexString(hashCode());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-10-25 18:44:58 UTC (rev 11595)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-10-25 21:50:31 UTC (rev 11596)
@@ -700,6 +700,24 @@
// Private --------------------------------------------------------------------------------------
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "HornetQConnectionFactory [serverLocator=" + serverLocator +
+ ", clientID=" +
+ clientID +
+ ", dupsOKBatchSize=" +
+ dupsOKBatchSize +
+ ", transactionBatchSize=" +
+ transactionBatchSize +
+ ", readOnly=" +
+ readOnly +
+ "]";
+ }
+
private void checkWrite()
{
if (readOnly)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-10-25 18:44:58 UTC (rev 11595)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-10-25 21:50:31 UTC (rev 11596)
@@ -13,17 +13,15 @@
package org.hornetq.jms.server.recovery;
-import java.util.Map;
+import java.util.Arrays;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -53,19 +51,25 @@
private static final Object lock = new Object();
private ServerLocator serverLocator;
-
+
private ClientSessionFactory csf;
private XAResource delegate;
private XARecoveryConfig[] xaRecoveryConfigs;
- //private TransportConfiguration currentConnection;
+ // private TransportConfiguration currentConnection;
public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
{
+ this.xaRecoveryConfigs = xaRecoveryConfigs;
- this.xaRecoveryConfigs = xaRecoveryConfigs;
+ if (log.isDebugEnabled())
+ {
+ log.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
+ ", instance=" +
+ System.identityHashCode(this));
+ }
}
public Xid[] recover(final int flag) throws XAException
@@ -74,10 +78,18 @@
HornetQXAResourceWrapper.log.debug("Recover " + xaResource);
try
{
- return xaResource.recover(flag);
+ Xid[] xids = xaResource.recover(flag);
+
+ if (log.isDebugEnabled() && xids != null && xids.length > 0)
+ {
+ log.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
+ }
+
+ return xids;
}
catch (XAException e)
{
+ log.warn(e.getMessage(), e);
throw check(e);
}
}
@@ -214,7 +226,8 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
- HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + csf + " will attempt reconnect on next pass",
+ HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + csf +
+ " will attempt reconnect on next pass",
me);
close();
}
@@ -244,9 +257,9 @@
if (result == null)
{
- //we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
- //all chaos is let loose
- if(retry)
+ // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+ // all chaos is let loose
+ if (retry)
{
XAException xae = new XAException("Connection unavailable for xa recovery");
xae.errorCode = XAException.XA_RETRY;
@@ -294,6 +307,10 @@
for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
+ }
ClientSession cs = null;
@@ -308,7 +325,13 @@
}
else
{
- cs = csf.createSession(xaRecoveryConfig.getUsername(), xaRecoveryConfig.getPassword(), true, false, false, false, 1);
+ cs = csf.createSession(xaRecoveryConfig.getUsername(),
+ xaRecoveryConfig.getPassword(),
+ true,
+ false,
+ false,
+ false,
+ 1);
}
}
catch (HornetQException e)
@@ -323,10 +346,29 @@
}
return delegate;
- }
+ }
+ log.warn("Can't connect to any hornetq server on recovery " + Arrays.toString(xaRecoveryConfigs));
throw new HornetQException(HornetQException.NOT_CONNECTED);
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "HornetQXAResourceWrapper [serverLocator=" + serverLocator +
+ ", csf=" +
+ csf +
+ ", delegate=" +
+ delegate +
+ ", xaRecoveryConfigs=" +
+ Arrays.toString(xaRecoveryConfigs) +
+ ", instance=" +
+ System.identityHashCode(this) +
+ "]";
+ }
+
/**
* Close the connection
*/
@@ -366,6 +408,8 @@
*/
protected XAException check(final XAException e) throws XAException
{
+ log.warn(e.getMessage(), e);
+
if (e.errorCode == XAException.XA_RETRY)
{
close();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-10-25 18:44:58 UTC (rev 11595)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-10-25 21:50:31 UTC (rev 11596)
@@ -65,8 +65,24 @@
return true;
}
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
@Override
+ public String toString()
+ {
+ return "XARecoveryConfig [hornetQConnectionFactory=" + hornetQConnectionFactory +
+ ", username=" +
+ username +
+ ", password=" +
+ password +
+ "]";
+ }
+
+ @Override
public int hashCode()
{
int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2011-10-25 18:44:58 UTC (rev 11595)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2011-10-25 21:50:31 UTC (rev 11596)
@@ -54,6 +54,8 @@
public HornetQResourceRecovery register(HornetQConnectionFactory factory, String userName, String password)
{
+ log.debug("registering recovery for factory : " + factory);
+
if(!isRegistered(factory) && registry != null)
{
XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
13 years, 4 months
JBoss hornetq SVN: r11595 - branches.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-10-25 14:44:58 -0400 (Tue, 25 Oct 2011)
New Revision: 11595
Removed:
branches/HORNETQ-316_old/
Log:
delete old branch
13 years, 4 months