[jboss-cvs] JBoss Messaging SVN: r6252 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 1 03:00:03 EDT 2009
Author: timfox
Date: 2009-04-01 03:00:03 -0400 (Wed, 01 Apr 2009)
New Revision: 6252
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
more tweaks to failover + netty multi thread failover test + disabled createproducer security checks
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -512,13 +512,6 @@
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.SERVER_DISCONNECTED);
-// log.info(System.identityHashCode(this) + " in failover or reconnect, attemptFailover is " +
-// attemptFailover +
-// " failoveronservers:" +
-// failoverOnServerShutdown +
-// " me.getcode " +
-// me.getCode());
-
boolean done = false;
if (attemptFailover || reconnectAttempts != 0)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -1196,14 +1196,16 @@
public void replicatePacket(final Packet packet, final long replicatedChannelID, final Runnable action)
{
packet.setChannelID(replicatedChannelID);
+
+ boolean runItNow = false;
synchronized (replicationLock)
{
if (playedResponsesOnFailure && action != null)
- {
+ {
// Already replicating channel failed, so just play the action now
-
- action.run();
+
+ runItNow = true;
}
else
{
@@ -1221,6 +1223,13 @@
connection.transportConnection.write(buffer);
}
}
+
+ //Execute outside lock
+
+ if (runItNow)
+ {
+ action.run();
+ }
}
public void executeOutstandingDelayedResults()
@@ -1238,6 +1247,8 @@
private void doExecuteOutstandingDelayedResults()
{
+ List<Runnable> toRun = new ArrayList<Runnable>();
+
synchronized (replicationLock)
{
// Execute all the response actions now
@@ -1248,7 +1259,7 @@
if (action != null)
{
- action.run();
+ toRun.add(action);
}
else
{
@@ -1260,6 +1271,12 @@
this.playedResponsesOnFailure = true;
}
+
+ //Run outside lock
+ for (Runnable action: toRun)
+ {
+ action.run();
+ }
}
public void setHandler(final ChannelHandler handler)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -220,9 +220,9 @@
}
configuration.start();
-
+
storageManager.start();
-
+
securityManager.start();
managementService.start();
@@ -282,7 +282,7 @@
if (uuid == null)
{
uuid = storageManager.getPersistentID();
-
+
if (uuid == null && !configuration.isBackup())
{
uuid = UUIDGenerator.getInstance().generateUUID();
@@ -306,7 +306,7 @@
}
}
-
+
serverManagement = managementService.registerServer(postOffice,
storageManager,
configuration,
@@ -435,26 +435,26 @@
if (!configuration.isBackup())
{
- //Once we ready we can start the remoting service so we can start accepting connections
+ // Once we ready we can start the remoting service so we can start accepting connections
remotingService.start();
}
-
+
started = true;
}
public synchronized void start() throws Exception
- {
+ {
if (started)
{
return;
}
-
+
remotingService.setMessagingServer(this);
if (configuration.isBackup())
{
remotingService.start();
-
+
// We defer actually initialisation until the live node has contacted the backup
log.info("Backup server will await live server before becoming operational");
}
@@ -470,20 +470,20 @@
{
return;
}
-
+
if (clusterManager != null)
- {
- clusterManager.stop();
+ {
+ clusterManager.stop();
}
-
+
remotingService.stop();
-
+
managementService.stop();
-
+
storageManager.stop();
-
+
securityManager.stop();
-
+
asyncDeliveryPool.shutdown();
try
@@ -526,7 +526,7 @@
resourceManager = null;
serverManagement = null;
- sessions.clear();
+ sessions.clear();
started = false;
initialised = false;
@@ -643,27 +643,30 @@
return clusterManager;
}
- private synchronized void checkActivate(final RemotingConnection connection)
+ private void checkActivate(final RemotingConnection connection)
{
if (configuration.isBackup())
- {
- freezeBackupConnection();
-
- List<Queue> toActivate = postOffice.activate();
-
- for (Queue queue : toActivate)
+ {
+ synchronized (this)
{
- scheduledExecutor.schedule(new ActivateRunner(queue),
- configuration.getQueueActivationTimeout(),
- TimeUnit.MILLISECONDS);
+ freezeBackupConnection();
+
+ List<Queue> toActivate = postOffice.activate();
+
+ for (Queue queue : toActivate)
+ {
+ scheduledExecutor.schedule(new ActivateRunner(queue),
+ configuration.getQueueActivationTimeout(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ configuration.setBackup(false);
+
+ if (clusterManager != null)
+ {
+ clusterManager.activate();
+ }
}
-
- configuration.setBackup(false);
-
- if (clusterManager != null)
- {
- clusterManager.activate();
- }
}
connection.activate();
@@ -823,7 +826,7 @@
public boolean isInitialised()
{
synchronized (initialiseLock)
- {
+ {
return initialised;
}
}
@@ -843,7 +846,7 @@
{
throw new IllegalStateException("Backup node already has a unique id but it's not the same as the live node id");
}
-
+
return;
}
@@ -1072,7 +1075,7 @@
if (config.isDurable())
{
storageManager.addQueueBinding(queueBinding);
- }
+ }
}
}
}
@@ -1171,7 +1174,7 @@
// some other people
// security my be screwed up, on account of thread local security stack
// being corrupted.
-
+
securityStore.authenticate(username, password);
ServerSession currentSession = sessions.remove(name);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -1050,20 +1050,24 @@
}
remotingConnection.removeFailureListener(this);
+
+ //Note. We do not destroy the replicating connection here. In the case the live server has really crashed
+ //then the connection will get cleaned up anyway when the server ping timeout kicks in.
+ //In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
+ //the replicating connection will cause the outstanding responses to be be replayed on the live server,
+ //if these reach the client who then subsequently fails over, on reconnection to backup, it will have
+ //received responses that the backup did not know about.
channel.transferConnection(newConnection, this.id, replicatingChannel);
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
- // Destroy the old connection
- remotingConnection.destroy();
-
remotingConnection = newConnection;
remotingConnection.addFailureListener(this);
int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
-
+
channel.replayCommands(lastReceivedCommandID, this.id);
if (wasStarted)
@@ -1086,7 +1090,7 @@
{
try
{
- log.info("Connection timed out, so clearing up resources for session " + name);
+ log.warn("Client connection failed, clearing up resources for session " + name);
for (Runnable runner : failureRunners)
{
@@ -1102,7 +1106,7 @@
handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
- log.info("Cleared up resources for session " + name);
+ log.warn("Cleared up resources for session " + name);
}
catch (Throwable t)
{
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -331,27 +331,28 @@
{
JBossDestination jbd = (JBossDestination)destination;
- if (jbd != null)
- {
- if (jbd instanceof Queue)
- {
- SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
-
- if (!response.isExists())
- {
- throw new InvalidDestinationException("Queue " + jbd.getName() + " does not exist");
- }
- }
- else
- {
- SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
-
- if (!response.isExists())
- {
- throw new InvalidDestinationException("Topic " + jbd.getName() + " does not exist");
- }
- }
- }
+ //TODO Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1565 is complete
+// if (jbd != null)
+// {
+// if (jbd instanceof Queue)
+// {
+// SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
+//
+// if (!response.isExists())
+// {
+// throw new InvalidDestinationException("Queue " + jbd.getName() + " does not exist");
+// }
+// }
+// else
+// {
+// SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
+//
+// if (!response.isExists())
+// {
+// throw new InvalidDestinationException("Topic " + jbd.getName() + " does not exist");
+// }
+// }
+// }
ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -102,6 +102,8 @@
{
for (int its = 0; its < numIts; its++)
{
+ log.info("Beginning iteration " + its);
+
start();
final ClientSessionFactoryInternal sf = createSessionFactory();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -1321,7 +1321,7 @@
final int numThreads,
final boolean failOnCreateConnection) throws Exception
{
- this.runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
+ runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
}
private void runTestMultipleThreads(final RunnableT runnable,
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java 2009-04-01 07:00:03 UTC (rev 6252)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.integration.transports.netty.TransportConstants;
+
+/**
+ * A NettyMultiThreadRandomFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Feb 2009 08:01:20
+ *
+ *
+ */
+public class NettyMultiThreadRandomFailoverTest extends MultiThreadRandomFailoverTest
+{
+ @Override
+ protected void start() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setJMXManagementEnabled(false);
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT + 1);
+ backupConf.getAcceptorConfigurations().clear();
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupServer = Messaging.newNullStorageMessagingServer(backupConf);
+ backupServer.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ backupConf.setJMXManagementEnabled(false);
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations().clear();
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory"));
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory",
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveServer = Messaging.newNullStorageMessagingServer(liveConf);
+ liveServer.start();
+ }
+
+ @Override
+ protected ClientSessionFactoryInternal createSessionFactory()
+ {
+ final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory",
+ backupParams));
+
+ sf.setSendWindowSize(32 * 1024);
+ return sf;
+ }
+
+}
More information about the jboss-cvs-commits
mailing list