[jboss-cvs] JBoss Messaging SVN: r7198 - in trunk: src/main/org/jboss/messaging/core/remoting/server/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 4 14:04:05 EDT 2009
Author: timfox
Date: 2009-06-04 14:04:04 -0400 (Thu, 04 Jun 2009)
New Revision: 7198
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
ping changes part 2
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -871,6 +871,11 @@
updateConnectionManagerArray();
}
+
+ public ConnectionManager[] getConnectionManagers()
+ {
+ return connectionManagerArray;
+ }
// Protected ------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
/**
* A ConnectionManager
@@ -60,4 +61,6 @@
int numSessions();
void close();
+
+ RemotingConnection getConnection(final int initialRefCount);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -146,7 +146,7 @@
private boolean inFailoverOrReconnect;
private Connector connector;
-
+
private Map<Object, FailedConnectionRunnable> failRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
@@ -164,8 +164,6 @@
debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
}
- public static boolean schedulePingersOneShot = false;
-
// Static
// ---------------------------------------------------------------------------------------
@@ -458,6 +456,13 @@
// Public
// ---------------------------------------------------------------------------------------
+
+ public void cancelPingerForConnectionID(final Object connectionID)
+ {
+ Pinger pinger = pingRunnables.get(connectionID);
+
+ pinger.close();
+ }
// Protected
// ------------------------------------------------------------------------------------
@@ -839,7 +844,7 @@
}
}
- private RemotingConnection getConnection(final int initialRefCount)
+ public RemotingConnection getConnection(final int initialRefCount)
{
RemotingConnection conn;
@@ -941,17 +946,8 @@
{
Pinger pinger = new Pinger(conn);
- Future<?> pingerFuture;
-
- if (schedulePingersOneShot)
- {
- pingerFuture = scheduledThreadPool.schedule(pinger, connectionTTL / 2, TimeUnit.MILLISECONDS);
- }
- else
- {
- pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger, connectionTTL / 2, connectionTTL / 2, TimeUnit.MILLISECONDS);
- }
-
+ Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger, connectionTTL / 2, connectionTTL / 2, TimeUnit.MILLISECONDS);
+
pinger.setFuture(pingerFuture);
pingRunnables.put(conn.getID(), pinger);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -97,9 +97,6 @@
private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
- // For debug
- public static boolean schedulePingersOneShot;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -355,6 +352,13 @@
// Public --------------------------------------------------------
+ public void cancelPingerForConnectionID(final Object connectionID)
+ {
+ Pinger pinger = pingRunnables.get(connectionID);
+
+ pinger.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -397,17 +401,8 @@
{
Pinger pingRunnable = new Pinger(conn);
- Future<?> pingFuture;
+ Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
- if (schedulePingersOneShot)
- {
- pingFuture = scheduledThreadPool.schedule(pingRunnable, 0, TimeUnit.MILLISECONDS);
- }
- else
- {
- pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
- }
-
pingRunnable.setFuture(pingFuture);
pingRunnables.put(conn.getID(), pingRunnable);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -31,6 +31,8 @@
import javax.management.MBeanServer;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ConnectionManager;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.DivertConfiguration;
@@ -67,15 +69,11 @@
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
import org.jboss.messaging.core.remoting.server.RemotingService;
import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
-import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
@@ -187,16 +185,10 @@
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
- private ConnectorFactory backupConnectorFactory;
-
- private Map<String, Object> backupConnectorParams;
-
private RemotingConnection replicatingConnection;
private Channel replicatingChannel;
- private Object replicatingChannelLock = new Object();
-
private final Object initialiseLock = new Object();
private boolean initialised;
@@ -342,6 +334,8 @@
replicatingConnection = null;
replicatingChannel = null;
+
+ replicatingConnectionManager.close();
}
resourceManager.stop();
@@ -618,66 +612,8 @@
}
}
- public Channel getReplicatingChannel()
- {
- synchronized (replicatingChannelLock)
- {
- if (replicatingChannel == null && backupConnectorFactory != null)
- {
- NoCacheConnectionLifeCycleListener listener = new NoCacheConnectionLifeCycleListener();
+ private ConnectionManager replicatingConnectionManager;
- replicatingConnection = (RemotingConnectionImpl)RemotingConnectionImpl.createConnection(backupConnectorFactory,
- backupConnectorParams,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- threadPool,
- listener);
-
- if (replicatingConnection == null)
- {
- return null;
- }
-
- listener.conn = replicatingConnection;
-
- replicatingChannel = replicatingConnection.getChannel(2, -1, false);
-
- replicatingConnection.addFailureListener(new FailureListener()
- {
- public boolean connectionFailed(MessagingException me)
- {
- replicatingChannel.executeOutstandingDelayedResults();
-
- return true;
- }
- });
-
- // First time we get channel we send a message down it informing the backup of our node id -
- // backup and live must have the same node id
-
- Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
-
- final Future future = new Future();
-
- replicatingChannel.replicatePacket(packet, 1, new Runnable()
- {
- public void run()
- {
- future.run();
- }
- });
-
- boolean ok = future.await(10000);
-
- if (!ok)
- {
- throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
- }
- }
- }
-
- return replicatingChannel;
- }
-
public MessagingServerControl getMessagingServerControl()
{
return messagingServerControl;
@@ -1087,6 +1023,11 @@
}
}
+ public Channel getReplicatingChannel()
+ {
+ return replicatingChannel;
+ }
+
private boolean setupReplicatingConnection() throws Exception
{
String backupConnectorName = configuration.getBackupConnectorName();
@@ -1101,36 +1042,67 @@
}
else
{
+ replicatingConnectionManager = new ConnectionManagerImpl(backupConnector,
+ null,
+ false,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD * 3,
+ 0,
+ 1.0d,
+ 0,
+ threadPool,
+ scheduledPool);
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
+ replicatingConnection = replicatingConnectionManager.getConnection(1);
+
+ if (replicatingConnection != null)
{
- Class<?> clz = loader.loadClass(backupConnector.getFactoryClassName());
- backupConnectorFactory = (ConnectorFactory)clz.newInstance();
+ replicatingChannel = replicatingConnection.getChannel(2, -1, false);
+
+ replicatingConnection.addFailureListener(new FailureListener()
+ {
+ public boolean connectionFailed(MessagingException me)
+ {
+ replicatingChannel.executeOutstandingDelayedResults();
+
+ return true;
+ }
+ });
+
+ // First time we get channel we send a message down it informing the backup of our node id -
+ // backup and live must have the same node id
+
+ Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+
+ final Future future = new Future();
+
+ replicatingChannel.replicatePacket(packet, 1, new Runnable()
+ {
+ public void run()
+ {
+ future.run();
+ }
+ });
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
+ }
}
- catch (Exception e)
+ else
{
- throw new IllegalArgumentException("Error instantiating interceptor \"" + backupConnector.getFactoryClassName() +
- "\"",
- e);
- }
+ log.warn("Backup server MUST be started before live server. Initialisation will proceed.");
- backupConnectorParams = backupConnector.getParams();
+ return false;
+ }
}
}
- Channel replicatingChannel = getReplicatingChannel();
-
- if (replicatingChannel == null && backupConnectorFactory != null)
- {
- log.warn("Backup server MUST be started before live server. Initialisation will proceed.");
-
- return false;
- }
- else
- {
- return true;
- }
+ return true;
}
private void loadJournal() throws Exception
@@ -1450,38 +1422,4 @@
}
}
- private class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
- {
- private RemotingConnection conn;
-
- public void connectionCreated(final Connection connection)
- {
- }
-
- public void connectionDestroyed(final Object connectionID)
- {
- if (conn != null)
- {
- conn.destroy();
- }
- }
-
- public void connectionException(final Object connectionID, final MessagingException me)
- {
- backupConnectorFactory = null;
-
- if (conn != null)
- {
- // Execute on different thread to avoid deadlocks
- threadPool.execute(new Runnable()
- {
- public void run()
- {
- conn.fail(me);
- }
- });
- }
- }
- }
-
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -76,7 +77,7 @@
{
final long clientFailureCheckPeriod = 500;
- ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryImpl sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
sf1.setClientFailureCheckPeriod(clientFailureCheckPeriod);
sf1.setConnectionTTL((long)(clientFailureCheckPeriod * 1.5));
@@ -106,7 +107,7 @@
final RemotingConnectionImpl conn1 = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
- //conn1.stopPingingAfterOne();
+ ((ConnectionManagerImpl)sf1.getConnectionManagers()[0]).cancelPingerForConnectionID(conn1.getID());
Thread.sleep(3 * clientFailureCheckPeriod);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-06-04 18:04:04 UTC (rev 7198)
@@ -22,24 +22,21 @@
package org.jboss.messaging.tests.integration.remoting;
-import static org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl.schedulePingersOneShot;
-
import java.util.Set;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.tests.util.ServiceTestBase;
@@ -223,9 +220,6 @@
ClientSessionFactoryImpl csf = new ClientSessionFactoryImpl(transportConfig);
- //We want to make sure only ping is sent
- ConnectionManagerImpl.schedulePingersOneShot = true;
-
csf.setClientFailureCheckPeriod(PING_INTERVAL);
csf.setConnectionTTL((long)(PING_INTERVAL * 1.5));
@@ -237,6 +231,12 @@
session.addFailureListener(clientListener);
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+
+ // We need to get it to stop pinging
+
+ ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).cancelPingerForConnectionID(conn.getID());
+
RemotingConnection serverConn = null;
while (serverConn == null)
@@ -281,52 +281,17 @@
assertNotNull(serverListener.getException());
session.close();
-
- ConnectionManagerImpl.schedulePingersOneShot = false;
}
/*
* Test the client triggering failure due to no pong received in time
*/
- public void testClientFailureNoServerPing() throws Exception
- {
- Interceptor noPongInterceptor = new Interceptor()
- {
- boolean allowPing = true;
-
- public boolean intercept(Packet packet, RemotingConnection conn) throws MessagingException
- {
- log.info("In interceptor, packet is " + packet.getType());
- if (packet.getType() == PacketImpl.PING)
- {
- if (allowPing)
- {
- log.info("allow 1 ping");
- allowPing = false;
- return true;
- }
- else
- {
- log.info("Ignoring Ping packet.. it will be dropped");
- return false;
- }
- }
- else
- {
- return true;
- }
- }
- };
-
- server.getRemotingService().addInterceptor(noPongInterceptor);
-
+ public void testClientFailureNoPong() throws Exception
+ {
TransportConfiguration transportConfig = new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory");
ClientSessionFactory csf = new ClientSessionFactoryImpl(transportConfig);
- //We want to make sure only one server->client ping is sent
- RemotingServiceImpl.schedulePingersOneShot = true;
-
csf.setClientFailureCheckPeriod(PING_INTERVAL);
csf.setConnectionTTL((long)(PING_INTERVAL * 1.5));
@@ -353,25 +318,20 @@
Thread.sleep(10);
}
}
-
+
Listener serverListener = new Listener();
serverConn.addFailureListener(serverListener);
-
+
+ ((RemotingServiceImpl)server.getRemotingService()).cancelPingerForConnectionID(serverConn.getID());
+
Thread.sleep(3 * PING_INTERVAL);
assertNotNull(clientListener.getException());
-
- // We receive an exception on the server in this case too
- assertNotNull(serverListener.getException());
-
+
assertEquals(0, server.getRemotingService().getConnections().size());
- server.getRemotingService().removeInterceptor(noPongInterceptor);
-
session.close();
-
- RemotingServiceImpl.schedulePingersOneShot = false;
}
// Package protected ---------------------------------------------
More information about the jboss-cvs-commits
mailing list