Author: clebert.suconic(a)jboss.com
Date: 2011-06-14 01:08:28 -0400 (Tue, 14 Jun 2011)
New Revision: 10796
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Cluster cleanup
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -406,10 +406,10 @@
public void causeExit()
{
- exitLoop = true;
synchronized (waitLock)
{
- waitLock.notify();
+ exitLoop = true;
+ waitLock.notifyAll();
}
}
@@ -420,7 +420,7 @@
return;
}
- // we need to stopthe factory from connecting if it is in the middle aof trying to
failover before we get the lock
+ // we need to stop the factory from connecting if it is in the middle of trying to
failover before we get the lock
causeExit();
synchronized (createSessionLock)
{
@@ -764,6 +764,8 @@
ChannelHandler handler = new ClientSessionPacketHandler(session,
sessionChannel);
sessionChannel.setHandler(handler);
+
+ log.info("Creating session " + session);
return new DelegatingSession(session);
}
@@ -889,13 +891,8 @@
synchronized (waitLock)
{
- while (true)
+ while (!exitLoop)
{
- if (exitLoop)
- {
- return;
- }
-
if (log.isDebugEnabled())
{
log.debug("Trying reconnection attempt " + count);
@@ -927,12 +924,12 @@
try
{
- waitLock.wait(interval);
+ waitLock.wait(interval);
}
catch (InterruptedException ignore)
{
}
-
+
// Exponential back-off
long newInterval = (long)(interval * retryIntervalMultiplier);
@@ -1283,6 +1280,21 @@
if (type == PacketImpl.DISCONNECT)
{
+// ClientSessionFactoryImpl.this.closed = true;
+//
+// for (ClientSessionInternal session :
ClientSessionFactoryImpl.this.sessions)
+// {
+// try
+// {
+// log.info("cleanup session on Factory " + session);
+// session.cleanUp(false);
+// }
+// catch (Exception e)
+// {
+// log.warn("Error cleaning up session " + session, e);
+// }
+// }
+//
final DisconnectMessage msg = (DisconnectMessage)packet;
closeExecutor.execute(new Runnable()
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -843,8 +843,11 @@
{
if (closed)
{
+ log.info("Session was already closed, giving up now, this=" + this);
return;
}
+
+ log.info("Calling close on session " + this);
try
{
@@ -1607,6 +1610,10 @@
return remotingConnection;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
@@ -1614,7 +1621,8 @@
{
buffer.append(entry.getKey() + "=" + entry.getValue() +
",");
}
- return "ClientSessionImpl::(" + buffer.toString() + ")";
+
+ return "ClientSessionImpl [name=" + name + ", username=" +
username + ", closed=" + closed + " metaData=(" + buffer +
")]@" + Integer.toHexString(hashCode()) ;
}
// Protected
@@ -1776,13 +1784,25 @@
private void doCleanup(boolean failingOver)
{
remotingConnection.removeFailureListener(this);
+
+ log.info("calling cleanup on " + this);
synchronized (this)
{
closed = true;
channel.close();
+
+
+ log.info("Calling unblock on " + this);
+
+ // if the server is sending a disconnect
+ // any pending blocked operation could hang without this
+ channel.returnBlocking();
}
+
+
+ log.info("Cleaned up " + this);
sessionFactory.removeSession(this, failingOver);
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -578,4 +578,10 @@
{
return session.isCompressLargeMessages();
}
+
+ public String toString()
+ {
+ return "DelegatingSession [session=" + session + "]";
+ }
+
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -242,24 +242,33 @@
public void stop() throws Exception
{
- if (started)
- {
- executor.execute(new Runnable()
- {
- public void run()
- {
- // We need to stop the csf here otherwise the stop runnable never runs
since the createobjectsrunnable is
- // trying to connect to the target
- // server which isn't up in an infinite loop
- if (csf != null)
- {
- //csf.close();
- csf = null;
- }
- }
- });
- }
+ // TODO: Remove this during merge:
+ // If we close the csf, at the same time we could have the bridge calling close
and proper cancellations (not just reseting as we used to do)
+ // we could have either Dead locks or very slow shutdowns on the testsuite.
+ // The solution I could find so far was to just leave the csf on
+
+ // TODO: Need to find a better way to close the CSF
+
+
+// if (started)
+// {
+// executor.execute(new Runnable()
+// {
+// public void run()
+// {
+// // We need to stop the csf here otherwise the stop runnable never runs
since the createobjectsrunnable is
+// // trying to connect to the target
+// // server which isn't up in an infinite loop
+// if (csf != null)
+// {
+// csf.close();
+// csf = null;
+// }
+// }
+// });
+// }
+//
log.info("Bridge " + this.name + " being stopped");
stopping = true;
@@ -590,7 +599,18 @@
csf = createSessionFactory();
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
+
+ try
+ {
+ session.addMetaData("Session-for-bridge", name.toString());
+ session.addMetaData("nodeUUID", nodeUUID.toString());
+ }
+ catch (Throwable dontCare)
+ {
+ // addMetaData here is just for debug purposes
+ }
+
if (forwardingAddress != null)
{
BindingQuery query = null;
@@ -661,6 +681,8 @@
}
catch (HornetQException e)
{
+ e.printStackTrace();
+ System.out.println("ex " + e);
if (csf != null)
{
csf.close();
@@ -707,14 +729,18 @@
private class StopRunnable implements Runnable
{
+ Exception created = new Exception ("Stop bridge called at for session = "
+ session);
public void run()
{
try
{
// We need to close the session outside of the lock,
// so any pending operation will be canceled right away
+ csf = null;
if (session != null)
{
+ log.info("Stopping bridge called at ", created);
+ log.info("Cleaning up session " + session);
session.close();
session.removeFailureListener(BridgeImpl.this);
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -149,7 +149,6 @@
{
if (flowRecord != null)
{
- // TODO: can I really remove this? nope
flowRecord.reset();
if (notifConsumer != null)
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -63,13 +63,11 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -159,6 +157,8 @@
private final MBeanServer mbeanServer;
private volatile boolean started;
+
+ private volatile boolean stopped;
private volatile SecurityStore securityStore;
@@ -353,6 +353,11 @@
nodeManager.startLiveNode();
+ if (stopped)
+ {
+ return;
+ }
+
initialisePart2();
log.info("Server is now live");
@@ -379,6 +384,8 @@
private class SharedStoreBackupActivation implements Activation
{
+
+ volatile boolean closed = false;
public void run()
{
try
@@ -397,6 +404,11 @@
configuration.setBackup(false);
+ if (stopped)
+ {
+ return;
+ }
+
initialisePart2();
clusterManager.activate();
@@ -480,6 +492,7 @@
backupActivationThread.interrupt();
+ // TODO: do we really need this?
Thread.sleep(1000);
}
@@ -536,6 +549,8 @@
public synchronized void start() throws Exception
{
+ stopped = false;
+
initialiseLogging();
checkJournalDirectory();
@@ -618,6 +633,7 @@
public void stop() throws Exception
{
+ stopped = true;
stop(configuration.isFailoverOnServerShutdown());
}
@@ -1463,7 +1479,13 @@
private void initialisePart2() throws Exception
{
// Load the journal and populate queues, transactions and caches in memory
-
+
+
+ if (stopped)
+ {
+ return;
+ }
+
pagingManager.reloadStores();
JournalLoadInformation[] journalInfo = loadJournals();
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -21,10 +21,15 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -1812,6 +1812,7 @@
* we need to wait a lil while between server start up to allow the server to
communicate in some order.
* This is to avoid split brain on startup
* */
+ // TODO: Do we really need this?
Thread.sleep(500);
}
for (int node : nodes)
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -140,6 +140,7 @@
staticConnectors, false);
backupConfig.getClusterConfigurations().add(cccLive);
backupServer = createBackupServer();
+ backupServer.getServer().setIdentity("bkpIdentityServer");
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-06-13
18:30:39 UTC (rev 10795)
+++
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-06-14
05:08:28 UTC (rev 10796)
@@ -971,7 +971,15 @@
}
// We shutdown the global pools to give a better isolation between tests
- ServerLocatorImpl.clearThreadPools();
+ try
+ {
+ ServerLocatorImpl.clearThreadPools();
+ }
+ catch (Throwable e)
+ {
+ log.info(threadDump(e.getMessage()));
+ throw new RuntimeException (e.getMessage(), e);
+ }
}
protected byte[] autoEncode(final Object... args)