[jboss-cvs] JBoss Messaging SVN: r7332 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 15 06:36:19 EDT 2009
Author: timfox
Date: 2009-06-15 06:36:18 -0400 (Mon, 15 Jun 2009)
New Revision: 7332
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1656
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -585,10 +585,6 @@
*/
private void sendCredits(final int credits)
{
- if (trace)
- {
- log.trace("Sending " + credits + " credits back", new Exception ("trace"));
- }
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -23,7 +23,6 @@
package org.jboss.messaging.core.client.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
import java.util.ArrayList;
import java.util.Collections;
@@ -90,9 +89,9 @@
// Attributes
// -----------------------------------------------------------------------------------
- //We need to keep the reference to prevent the factory getting gc'd before the sessions are finished being used
+ // We need to keep the reference to prevent the factory getting gc'd before the sessions are finished being used
private final ClientSessionFactory factory;
-
+
private final TransportConfiguration connectorConfig;
private final TransportConfiguration backupConfig;
@@ -151,10 +150,8 @@
private Connector connector;
- private Map<Object, FailedConnectionRunnable> failRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
+ private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
- private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
-
// debug
private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
@@ -189,7 +186,7 @@
final ScheduledExecutorService scheduledThreadPool)
{
this.factory = factory;
-
+
this.connectorConfig = connectorConfig;
this.backupConfig = backupConfig;
@@ -460,24 +457,23 @@
{
closed = true;
}
-
// Public
// ---------------------------------------------------------------------------------------
public void cancelPingerForConnectionID(final Object connectionID)
{
- Pinger pinger = pingRunnables.get(connectionID);
+ Pinger pinger = pingers.get(connectionID);
pinger.close();
}
-
+
@Override
protected void finalize() throws Throwable
{
- //In case user forgets to close it explicitly
+ // In case user forgets to close it explicitly
close();
-
+
super.finalize();
}
@@ -496,7 +492,7 @@
}
private boolean failoverOrReconnect(final MessagingException me, final Object connectionID)
- {
+ {
// To prevent recursion
if (inFailoverOrReconnect)
{
@@ -543,7 +539,7 @@
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
boolean done = false;
-
+
if (attemptFailover || reconnectAttempts != 0)
{
lockAllChannel1s();
@@ -592,9 +588,9 @@
{
oldConnections.add(entry.connection);
}
+
+ closePingers();
- closeScheduledRunnables();
-
connections.clear();
refCount = 0;
@@ -643,6 +639,7 @@
else
{
// Fail the old connections so their listeners get called
+
for (RemotingConnection connection : oldConnections)
{
connection.fail(me);
@@ -652,7 +649,9 @@
else
{
// Just fail the connections
-
+
+ closePingers();
+
failConnection(me);
}
@@ -662,28 +661,16 @@
}
}
- private void closeScheduledRunnables()
+ private void closePingers()
{
- for (Object id : new HashSet<Object>(connections.keySet()))
+ for (Pinger pinger: pingers.values())
{
- connections.remove(id);
-
- FailedConnectionRunnable runnable = failRunnables.remove(id);
-
- if (runnable != null)
- {
- runnable.close();
- }
-
- Pinger pingRunnable = pingRunnables.remove(id);
-
- if (pingRunnable != null)
- {
- pingRunnable.close();
- }
+ pinger.close();
}
+
+ pingers.clear();
}
-
+
/*
* Re-attach sessions all pre-existing sessions to new remoting connections
*/
@@ -832,7 +819,7 @@
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
- closeScheduledRunnables();
+ closePingers();
connections.clear();
@@ -951,37 +938,24 @@
// Send the initial ping, we always do this it contains connectionTTL and clientFailureInterval -
// the server needs this in order to do pinging and failure checking
+ Pinger pinger = new Pinger(conn, clientFailureCheckPeriod, new Channel0Handler(conn), new FailedConnectionAction(conn), 0);
+
+ pingers.put(conn.getID(), pinger);
+
Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
Channel channel0 = conn.getChannel(0, -1, false);
- channel0.setHandler(new Channel0Handler(conn));
-
channel0.send(ping);
if (clientFailureCheckPeriod != -1)
{
- Pinger pinger = new Pinger(conn);
-
Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger,
- connectionTTL / 2,
- connectionTTL / 2,
+ clientFailureCheckPeriod,
+ clientFailureCheckPeriod,
TimeUnit.MILLISECONDS);
- pinger.setFuture(pingerFuture);
-
- pingRunnables.put(conn.getID(), pinger);
-
- FailedConnectionRunnable fcRunnable = new FailedConnectionRunnable(conn);
-
- Future<?> fcFuture = scheduledThreadPool.scheduleAtFixedRate(fcRunnable,
- clientFailureCheckPeriod,
- clientFailureCheckPeriod,
- TimeUnit.MILLISECONDS);
-
- fcRunnable.setFuture(fcFuture);
-
- failRunnables.put(conn.getID(), fcRunnable);
+ pinger.setFuture(pingerFuture);
}
if (debug)
@@ -1008,8 +982,6 @@
return conn;
}
-
-
private void returnConnection(final Object connectionID)
{
ConnectionEntry entry = connections.get(connectionID);
@@ -1093,7 +1065,7 @@
}
private void failConnection(final Object connectionID, final MessagingException me)
- {
+ {
ConnectionEntry entry = connections.get(connectionID);
if (entry != null)
@@ -1103,7 +1075,7 @@
conn.fail(me);
}
}
-
+
private class Channel0Handler implements ChannelHandler
{
private final RemotingConnection conn;
@@ -1117,12 +1089,8 @@
{
final byte type = packet.getType();
- if (type == PING)
+ if (type == PacketImpl.DISCONNECT)
{
- // Do nothing
- }
- else if (type == PacketImpl.DISCONNECT)
- {
threadPool.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1141,57 +1109,29 @@
}
}
- private class FailedConnectionRunnable implements Runnable
+ private class FailedConnectionAction implements Runnable
{
- private boolean closed;
-
private RemotingConnection conn;
- private Future<?> future;
-
- FailedConnectionRunnable(final RemotingConnection conn)
+ FailedConnectionAction(final RemotingConnection conn)
{
this.conn = conn;
}
- public synchronized void setFuture(final Future<?> future)
- {
- this.future = future;
- }
-
public synchronized void run()
{
- if (closed)
- {
- return;
- }
+ final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Did not receive data from server (or ping).");
- if (!conn.isDataReceived())
+ threadPool.execute(new Runnable()
{
- final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Did not receive data from server (or ping).");
-
- threadPool.execute(new Runnable()
+ // Must be executed on different thread
+ public void run()
{
- // Must be executed on different thread
- public void run()
- {
- conn.fail(me);
- }
- });
- }
- else
- {
- conn.clearDataReceived();
- }
+ conn.fail(me);
+ }
+ });
}
-
- public synchronized void close()
- {
- future.cancel(false);
-
- closed = true;
- }
}
private static class ConnectionEntry
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -33,6 +33,8 @@
void replicatePacket(Packet packet, long replicatedChannelID, Runnable action);
void setHandler(ChannelHandler handler);
+
+ ChannelHandler getHandler();
void close();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -62,12 +62,4 @@
void freeze();
Connection getTransportConnection();
-
- boolean isDataReceived();
-
- boolean isDataSent();
-
- void clearDataSent();
-
- void clearDataReceived();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -27,7 +27,10 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
/**
@@ -37,58 +40,105 @@
*
*
*/
-public class Pinger implements Runnable
-{
+public class Pinger implements Runnable, ChannelHandler
+{
private static final Logger log = Logger.getLogger(Pinger.class);
- private volatile boolean closed;
+ private boolean closed;
private RemotingConnection conn;
- private volatile Future<?> future;
+ private Future<?> future;
+
+ private long lastPingReceived;
+
+ private final long expiryPeriod;
+
+ private final ChannelHandler extraHandler;
+
+ private final Runnable connectionFailedAction;
+
+ private final Channel channel0;
+
+ private boolean first = true;
- public Pinger(final RemotingConnection conn)
+ public Pinger(final RemotingConnection conn, final long expiryPeriod, final ChannelHandler extraHandler,
+ final Runnable connectionFailedAction, final long lastPingReceived)
{
this.conn = conn;
+
+ this.expiryPeriod = expiryPeriod;
+
+ this.extraHandler = extraHandler;
+
+ this.connectionFailedAction = connectionFailedAction;
+
+ this.channel0 = conn.getChannel(0, -1, false);
+
+ this.lastPingReceived = lastPingReceived;
+
+ channel0.setHandler(this);
}
-
- public void setFuture(final Future<?> future)
+
+ public synchronized void setFuture(final Future<?> future)
{
this.future = future;
}
-
- public void run()
+
+ public synchronized void handlePacket(final Packet packet)
{
if (closed)
{
return;
}
- //TODO - for now we *always* sent the ping otherwise.
- //Checking dataSent does not work, for the following reason:
- //If a packet is sent just after the last ping, then no ping will be sent the next time.
- //Which means the amount of time between pings can approach 2 * ( 0.5 * client failure check period) = failure check period
- //so, due to time taken to actually travel across network + scheduling difference the client failure checker
- //can easily time out.
+ if (packet.getType() == PacketImpl.PING)
+ {
+ lastPingReceived = System.currentTimeMillis();
+ }
+ else if (extraHandler != null)
+ {
+ extraHandler.handlePacket(packet);
+ }
+ else
+ {
+ throw new IllegalStateException("Invalid packet " + packet.getType());
+ }
+ }
+
+ public synchronized void run()
+ {
+ if (closed)
+ {
+ return;
+ }
-// if (!conn.isDataSent())
-// {
- // We only send a ping if no data has been sent since last ping
-
- Ping ping = new Ping();
-
- Channel channel0 = conn.getChannel(0, -1, false);
-
- channel0.send(ping);
- // }
-
- conn.clearDataSent();
+ if (!first && ( System.currentTimeMillis() - lastPingReceived > expiryPeriod))
+ {
+ connectionFailedAction.run();
+ }
+ else if (!stopPinging)
+ {
+ channel0.send(new Ping());
+ }
+
+ first = false;
}
-
- public void close()
+
+ public synchronized void close()
{
- future.cancel(false);
+ if (future != null)
+ {
+ future.cancel(false);
+ }
closed = true;
}
+
+ private boolean stopPinging;
+
+ public synchronized void stopPinging()
+ {
+ this.stopPinging = true;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -247,11 +247,7 @@
private boolean frozen;
private final Object failLock = new Object();
-
- private volatile boolean dataReceived;
-
- private volatile boolean dataSent;
-
+
// debug only stuff
private boolean createdActive;
@@ -473,8 +469,6 @@
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
- dataReceived = true;
-
final Packet packet = decode(buffer);
synchronized (transferLock)
@@ -1060,8 +1054,6 @@
if (connection.active || packet.isWriteAlways())
{
connection.transportConnection.write(buffer, flush);
-
- connection.dataSent = true;
}
}
finally
@@ -1131,8 +1123,6 @@
connection.transportConnection.write(buffer);
- connection.dataSent = true;
-
long toWait = connection.blockingCallTimeout;
long start = System.currentTimeMillis();
@@ -1213,8 +1203,6 @@
packet.encode(buffer);
connection.transportConnection.write(buffer);
-
- connection.dataSent = true;
}
}
@@ -1277,6 +1265,11 @@
{
this.handler = handler;
}
+
+ public ChannelHandler getHandler()
+ {
+ return handler;
+ }
public void close()
{
@@ -1626,24 +1619,4 @@
}
}
}
-
- public boolean isDataReceived()
- {
- return dataReceived;
- }
-
- public void clearDataReceived()
- {
- dataReceived = false;
- }
-
- public boolean isDataSent()
- {
- return dataSent;
- }
-
- public void clearDataSent()
- {
- dataSent = false;
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ssl/SSLSupport.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.ssl;
@@ -59,41 +59,42 @@
// Public --------------------------------------------------------
public static SSLContext createServerContext(String keystorePath,
- String keystorePassword, String trustStorePath,
- String trustStorePassword) throws Exception
+ String keystorePassword,
+ String trustStorePath,
+ String trustStorePassword) throws Exception
{
// Initialize the SSLContext to work with our key managers.
SSLContext sslContext = SSLContext.getInstance("TLS");
KeyManager[] keyManagers = loadKeyManagers(keystorePath, keystorePassword);
- TrustManager[] trustManagers = loadTrustManager(false, trustStorePath,
- trustStorePassword);
+ TrustManager[] trustManagers = loadTrustManager(false, trustStorePath, trustStorePassword);
sslContext.init(keyManagers, trustManagers, new SecureRandom());
return sslContext;
}
- public static SSLContext createClientContext(String keystorePath,
- String keystorePassword) throws Exception
+ public static SSLContext createClientContext(String keystorePath, String keystorePassword) throws Exception
{
SSLContext context = SSLContext.getInstance("TLS");
KeyManager[] keyManagers = loadKeyManagers(keystorePath, keystorePassword);
- TrustManager[] trustManagers = loadTrustManager(true, null, null);
+ TrustManager[] trustManagers = loadTrustManager(true, null, null);
context.init(keyManagers, trustManagers, new SecureRandom());
return context;
}
- public static SSLContext getInstance(boolean client, String keystorePath,
- String keystorePassword, String trustStorePath,
- String trustStorePassword) throws GeneralSecurityException, Exception
+ public static SSLContext getInstance(boolean client,
+ String keystorePath,
+ String keystorePassword,
+ String trustStorePath,
+ String trustStorePassword) throws GeneralSecurityException, Exception
{
if (client)
{
return createClientContext(keystorePath, keystorePassword);
- } else
+ }
+ else
{
- return createServerContext(keystorePath, keystorePassword,
- trustStorePath, trustStorePassword);
+ return createServerContext(keystorePath, keystorePassword, trustStorePath, trustStorePassword);
}
}
@@ -103,8 +104,7 @@
// Private -------------------------------------------------------
- private static TrustManager[] loadTrustManager(boolean clientMode,
- String trustStorePath, String trustStorePassword) throws Exception
+ private static TrustManager[] loadTrustManager(boolean clientMode, String trustStorePath, String trustStorePassword) throws Exception
{
if (clientMode)
{
@@ -113,13 +113,11 @@
// return a trust manager that trusts all certs
return new TrustManager[] { new X509TrustManager()
{
- public void checkClientTrusted(X509Certificate[] chain,
- String authType)
+ public void checkClientTrusted(X509Certificate[] chain, String authType)
{
}
- public void checkServerTrusted(X509Certificate[] chain,
- String authType)
+ public void checkServerTrusted(X509Certificate[] chain, String authType)
{
}
@@ -128,38 +126,38 @@
return null;
}
} };
- } else
+ }
+ else
{
TrustManagerFactory trustMgrFactory;
- KeyStore trustStore = SSLSupport.loadKeystore(trustStorePath,
- trustStorePassword);
- trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory
- .getDefaultAlgorithm());
+ KeyStore trustStore = SSLSupport.loadKeystore(trustStorePath, trustStorePassword);
+ trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustMgrFactory.init(trustStore);
return trustMgrFactory.getTrustManagers();
}
}
- private static KeyStore loadKeystore(String keystorePath,
- String keystorePassword) throws Exception
+ private static KeyStore loadKeystore(String keystorePath, String keystorePassword) throws Exception
{
assert keystorePath != null;
assert keystorePassword != null;
-
+
KeyStore ks = KeyStore.getInstance("JKS");
InputStream in = null;
try
{
URL keystoreURL = validateStoreURL(keystorePath);
ks.load(keystoreURL.openStream(), keystorePassword.toCharArray());
- } finally
+ }
+ finally
{
if (in != null)
{
try
{
in.close();
- } catch (IOException ignored)
+ }
+ catch (IOException ignored)
{
}
}
@@ -167,11 +165,9 @@
return ks;
}
- private static KeyManager[] loadKeyManagers(String keystorePath,
- String keystorePassword) throws Exception
+ private static KeyManager[] loadKeyManagers(String keystorePath, String keystorePassword) throws Exception
{
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
- .getDefaultAlgorithm());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = loadKeystore(keystorePath, keystorePassword);
kmf.init(ks, keystorePassword.toCharArray());
@@ -181,21 +177,22 @@
private static URL validateStoreURL(String storePath) throws Exception
{
assert storePath != null;
-
+
// First see if this is a URL
try
{
return new URL(storePath);
- } catch (MalformedURLException e)
+ }
+ catch (MalformedURLException e)
{
File file = new File(storePath);
if (file.exists() == true && file.isFile())
{
return file.toURI().toURL();
- } else
+ }
+ else
{
- URL url = Thread.currentThread().getContextClassLoader()
- .getResource(storePath);
+ URL url = Thread.currentThread().getContextClassLoader().getResource(storePath);
if (url != null)
return url;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -41,7 +41,6 @@
import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
@@ -95,10 +94,8 @@
private final ScheduledExecutorService scheduledThreadPool;
- private Map<Object, FailedConnectionRunnable> connectionTTLRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
+ private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
- private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
-
private final int managementConnectorID;
// Static --------------------------------------------------------
@@ -144,7 +141,7 @@
{
return;
}
-
+
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (TransportConfiguration info : transportConfigs)
@@ -169,31 +166,31 @@
log.warn("Error instantiating acceptor \"" + info.getFactoryClassName() + "\"", e);
}
}
-
- //We now create a "special" acceptor used by management to send/receive management messages - this is an invm
- //acceptor with a -ve server id
- //TODO this is not the best solution, management should send/receive management messages direct.
- //Remove this code when this is implemented without having to require a special acceptor
- //https://jira.jboss.org/jira/browse/JBMESSAGING-1649
-
+
+ // We now create a "special" acceptor used by management to send/receive management messages - this is an invm
+ // acceptor with a -ve server id
+ // TODO this is not the best solution, management should send/receive management messages direct.
+ // Remove this code when this is implemented without having to require a special acceptor
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1649
+
if (config.isJMXManagementEnabled())
{
Map<String, Object> params = new HashMap<String, Object>();
-
+
params.put(TransportConstants.SERVER_ID_PROP_NAME, managementConnectorID);
-
+
AcceptorFactory factory = new InVMAcceptorFactory();
-
+
Acceptor acceptor = factory.createAcceptor(params, bufferHandler, this, threadPool);
-
+
acceptors.add(acceptor);
-
+
if (managementService != null)
{
TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
-
+
managementService.registerAcceptor(acceptor, info);
- }
+ }
}
for (Acceptor a : acceptors)
@@ -239,16 +236,11 @@
acceptors.clear();
- for (FailedConnectionRunnable runnable : connectionTTLRunnables.values())
+ for (Pinger runnable : pingers.values())
{
runnable.close();
}
- for (Pinger runnable : pingRunnables.values())
- {
- runnable.close();
- }
-
connections.clear();
started = false;
@@ -296,21 +288,14 @@
channel1.setHandler(handler);
- Channel channel0 = rc.getChannel(0, -1, false);
+ connections.put(connection.getID(), rc);
- Channel0Handler channel0Handler = new Channel0Handler(rc);
+ InitialPingTimeout runnable = new InitialPingTimeout(rc);
- channel0.setHandler(channel0Handler);
-
- Object id = connection.getID();
-
- connections.put(id, rc);
-
- InitialPingTimeout runnable = new InitialPingTimeout(rc, channel0Handler);
-
// We schedule an initial ping timeout. An inital ping is always sent from the client as the first thing it
// does after creating a connection, this contains the ping period and connection TTL, if it doesn't
// arrive the connection will get closed
+
scheduledThreadPool.schedule(runnable, INITIAL_PING_TIMEOUT, TimeUnit.MILLISECONDS);
if (config.isBackup())
@@ -322,7 +307,7 @@
public void connectionDestroyed(final Object connectionID)
{
RemotingConnection conn = connections.get(connectionID);
-
+
if (conn != null)
{
// if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
@@ -362,11 +347,11 @@
// Public --------------------------------------------------------
- public void cancelPingerForConnectionID(final Object connectionID)
+ public void stopPingingForConnectionID(final Object connectionID)
{
- Pinger pinger = pingRunnables.get(connectionID);
+ Pinger pinger = pingers.get(connectionID);
- pinger.close();
+ pinger.stopPinging();
}
// Package protected ---------------------------------------------
@@ -375,7 +360,7 @@
// Private -------------------------------------------------------
- private void setupScheduledRunnables(final RemotingConnection conn,
+ private void setupPinger(final RemotingConnection conn,
final long clientFailureCheckPeriod,
final long connectionTTL)
{
@@ -392,72 +377,47 @@
long connectionTTLToUse = config.getConnectionTTLOverride() != -1 ? config.getConnectionTTLOverride()
: connectionTTL;
- if (connectionTTLToUse != -1)
- {
- FailedConnectionRunnable runnable = new FailedConnectionRunnable(conn);
-
- Future<?> connectionTTLFuture = scheduledThreadPool.scheduleAtFixedRate(runnable,
- connectionTTLToUse,
- connectionTTLToUse,
- TimeUnit.MILLISECONDS);
-
- runnable.setFuture(connectionTTLFuture);
-
- connectionTTLRunnables.put(conn.getID(), runnable);
- }
-
long pingPeriod = clientFailureCheckPeriod == -1 ? -1 : clientFailureCheckPeriod / 2;
- if (pingPeriod != -1)
- {
- Pinger pingRunnable = new Pinger(conn);
+ Pinger pingRunnable = new Pinger(conn, connectionTTLToUse, null, new FailedConnectionAction(conn), System.currentTimeMillis());
- Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable,
- 0,
- pingPeriod,
- TimeUnit.MILLISECONDS);
+ Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
- pingRunnable.setFuture(pingFuture);
+ pingRunnable.setFuture(pingFuture);
- pingRunnables.put(conn.getID(), pingRunnable);
- }
+ pingers.put(conn.getID(), pingRunnable);
}
private RemotingConnection closeConnection(final Object connectionID)
{
RemotingConnection connection = connections.remove(connectionID);
+
+ Pinger pinger = pingers.remove(connectionID);
- FailedConnectionRunnable runnable = connectionTTLRunnables.remove(connectionID);
-
- if (runnable != null)
+ if (pinger != null)
{
- runnable.close();
+ pinger.close();
}
- Pinger pingRunnable = pingRunnables.remove(connectionID);
-
- if (pingRunnable != null)
- {
- pingRunnable.close();
- }
-
return connection;
}
// Inner classes -------------------------------------------------
- private class Channel0Handler implements ChannelHandler
+ private class InitialPingTimeout implements Runnable, ChannelHandler
{
private final RemotingConnection conn;
- private volatile boolean gotInitialPing;
+ private boolean gotInitialPing;
- private Channel0Handler(final RemotingConnection conn)
+ private InitialPingTimeout(final RemotingConnection conn)
{
this.conn = conn;
+
+ conn.getChannel(0, -1, false).setHandler(this);
}
-
- public void handlePacket(final Packet packet)
+
+ public synchronized void handlePacket(final Packet packet)
{
final byte type = packet.getType();
@@ -467,7 +427,7 @@
{
Ping ping = (Ping)packet;
- setupScheduledRunnables(conn, ping.getClientFailureCheckPeriod(), ping.getConnectionTTL());
+ setupPinger(conn, ping.getClientFailureCheckPeriod(), ping.getConnectionTTL());
gotInitialPing = true;
}
@@ -478,28 +438,9 @@
}
}
- private boolean isGotInitialPing()
+ public synchronized void run()
{
- return gotInitialPing;
- }
- }
-
- private class InitialPingTimeout implements Runnable
- {
- private final RemotingConnection conn;
-
- private final Channel0Handler handler;
-
- private InitialPingTimeout(final RemotingConnection conn, final Channel0Handler handler)
- {
- this.conn = conn;
-
- this.handler = handler;
- }
-
- public void run()
- {
- if (!handler.isGotInitialPing())
+ if (!gotInitialPing)
{
// Never received initial ping
log.warn("Did not receive initial ping for connection, it will be closed");
@@ -511,52 +452,24 @@
}
}
- private class FailedConnectionRunnable implements Runnable
+ private class FailedConnectionAction implements Runnable
{
- private boolean closed;
-
private RemotingConnection conn;
- private Future<?> future;
-
- FailedConnectionRunnable(final RemotingConnection conn)
+ FailedConnectionAction(final RemotingConnection conn)
{
this.conn = conn;
}
- public synchronized void setFuture(final Future<?> future)
- {
- this.future = future;
- }
-
public synchronized void run()
{
- if (closed)
- {
- return;
- }
+ removeConnection(conn.getID());
- if (!conn.isDataReceived())
- {
- removeConnection(conn.getID());
+ MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
- MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
-
- conn.fail(me);
- }
- else
- {
- conn.clearDataReceived();
- }
+ conn.fail(me);
}
-
- public synchronized void close()
- {
- future.cancel(false);
-
- closed = true;
- }
}
private class DelegatingBufferHandler extends AbstractBufferHandler
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -1160,7 +1160,7 @@
{
try
{
- log.warn("Client connection failed, clearing up resources for session " + name, new Exception());
+ log.warn("Client connection failed, clearing up resources for session " + name);
for (Runnable runner : failureRunners)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-06-15 09:55:22 UTC (rev 7331)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-06-15 10:36:18 UTC (rev 7332)
@@ -314,7 +314,7 @@
serverConn.addFailureListener(serverListener);
- ((RemotingServiceImpl)server.getRemotingService()).cancelPingerForConnectionID(serverConn.getID());
+ ((RemotingServiceImpl)server.getRemotingService()).stopPingingForConnectionID(serverConn.getID());
for (int i = 0; i < 1000; i++)
{
More information about the jboss-cvs-commits
mailing list