JBoss hornetq SVN: r8657 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 16:45:29 -0500 (Wed, 09 Dec 2009)
New Revision: 8657
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
Tweak on timer.. only spin when sync = true
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 21:07:15 UTC (rev 8656)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 21:45:29 UTC (rev 8657)
@@ -251,9 +251,10 @@
//
// flush();
// }
+
+ timer.resumeSpin();
}
- timer.resumeSpin();
}
15 years
JBoss hornetq SVN: r8656 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 16:07:15 -0500 (Wed, 09 Dec 2009)
New Revision: 8656
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
Tweak to TimedBuffer.
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 20:53:57 UTC (rev 8655)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 21:07:15 UTC (rev 8656)
@@ -148,7 +148,7 @@
bufferObserver = null;
- timer.close();
+ timer.stop();
if (logRates)
{
@@ -258,8 +258,26 @@
/**
- * Note: Flush could be called by either the CheckTime, or by the Journal directly when moving to a new file
+ * This method will verify if it a flush is required.
+ * It is called directly by the CheckTimer.
+ *
+ * @return true means you can pause spinning for a while
* */
+ private synchronized boolean checkFlush()
+ {
+ // delayFlush and pendingSync are changed inside synchronized blocks
+ // They need to be done atomically
+ if (!delayFlush && pendingSync && bufferObserver != null)
+ {
+ flush();
+ return true;
+ }
+ else return !delayFlush;
+ }
+
+ /**
+ * Note: Flush could be called by either the checkFlush (and timer), or by the Journal directly before moving to a new file
+ * */
public synchronized void flush()
{
if (buffer.writerIndex() > 0)
@@ -357,7 +375,7 @@
private class CheckTimer implements Runnable
{
- private volatile boolean closed = false;
+ private volatile boolean stopped = false;
private boolean spinning = false;
@@ -404,7 +422,7 @@
public void run()
{
- while (!closed)
+ while (!stopped)
{
// We flush on the timer if there are pending syncs there and we've waited waited at least one
// timeout since the time of the last flush
@@ -412,17 +430,11 @@
if (System.nanoTime() > lastFlushTime.get() + timeout)
{
- // delayFlush and pendingSync are changed inside synchronized blocks
- // They need to be done atomically
- synchronized (TimedBuffer.this)
+ if (checkFlush())
{
- if (!delayFlush && pendingSync && bufferObserver != null)
+ if (!stopped)
{
- flush();
- }
- else if (!closed && !delayFlush)
- {
- // if delayFlush is set, it means we have to keep trying, we can't stop spinning on this case
+ // can't pause spin if stopped, or we would hang the thread
pauseSpin();
}
}
@@ -442,9 +454,9 @@
}
}
- public void close()
+ public void stop()
{
- closed = true;
+ stopped = true;
resumeSpin();
}
}
15 years
JBoss hornetq SVN: r8655 - in trunk: tests/src/org/hornetq/tests/integration/jms/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-09 15:53:57 -0500 (Wed, 09 Dec 2009)
New Revision: 8655
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-242
Modified: trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2009-12-09 20:37:28 UTC (rev 8654)
+++ trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2009-12-09 20:53:57 UTC (rev 8655)
@@ -373,35 +373,42 @@
public Object readObject() throws JMSException
{
checkRead();
- byte type = getBuffer().readByte();
- switch (type)
+ try
{
- case DataConstants.BOOLEAN:
- return getBuffer().readBoolean();
- case DataConstants.BYTE:
- return getBuffer().readByte();
- case DataConstants.SHORT:
- return getBuffer().readShort();
- case DataConstants.CHAR:
- return (char)getBuffer().readShort();
- case DataConstants.INT:
- return getBuffer().readInt();
- case DataConstants.LONG:
- return getBuffer().readLong();
- case DataConstants.FLOAT:
- return Float.intBitsToFloat(getBuffer().readInt());
- case DataConstants.DOUBLE:
- return Double.longBitsToDouble(getBuffer().readLong());
- case DataConstants.STRING:
- return getBuffer().readNullableString();
- case DataConstants.BYTES:
- int len = getBuffer().readInt();
- byte[] bytes = new byte[len];
- getBuffer().readBytes(bytes);
- return bytes;
- default:
- throw new MessageFormatException("Invalid conversion");
+ byte type = getBuffer().readByte();
+ switch (type)
+ {
+ case DataConstants.BOOLEAN:
+ return getBuffer().readBoolean();
+ case DataConstants.BYTE:
+ return getBuffer().readByte();
+ case DataConstants.SHORT:
+ return getBuffer().readShort();
+ case DataConstants.CHAR:
+ return (char)getBuffer().readShort();
+ case DataConstants.INT:
+ return getBuffer().readInt();
+ case DataConstants.LONG:
+ return getBuffer().readLong();
+ case DataConstants.FLOAT:
+ return Float.intBitsToFloat(getBuffer().readInt());
+ case DataConstants.DOUBLE:
+ return Double.longBitsToDouble(getBuffer().readLong());
+ case DataConstants.STRING:
+ return getBuffer().readNullableString();
+ case DataConstants.BYTES:
+ int len = getBuffer().readInt();
+ byte[] bytes = new byte[len];
+ getBuffer().readBytes(bytes);
+ return bytes;
+ default:
+ throw new MessageFormatException("Invalid conversion");
+ }
}
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new MessageEOFException("");
+ }
}
public void writeBoolean(final boolean value) throws JMSException
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java 2009-12-09 20:37:28 UTC (rev 8654)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java 2009-12-09 20:53:57 UTC (rev 8655)
@@ -16,9 +16,11 @@
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.StreamMessage;
import junit.framework.Assert;
@@ -55,6 +57,160 @@
// Public --------------------------------------------------------
+ //https://jira.jboss.org/jira/browse/HORNETQ-242
+ public void testStreamMessageReadsNull() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ Queue queue = createQueue("testQueue");
+
+ try
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ conn.start();
+
+ StreamMessage msg = sess.createStreamMessage();
+
+ msg.writeInt(1);
+ msg.writeInt(2);
+ msg.writeInt(3);
+
+
+ StreamMessage received = (StreamMessage)sendAndConsumeMessage(msg, prod, cons);
+
+ Assert.assertNotNull(received);
+
+ assertEquals(1, received.readObject());
+ assertEquals(2, received.readObject());
+ assertEquals(3, received.readObject());
+
+ try
+ {
+ received.readObject();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readBoolean();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readByte();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readChar();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readDouble();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readFloat();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readInt();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readLong();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readShort();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ received.readString();
+
+ fail("Should throw exception");
+ }
+ catch (MessageEOFException e)
+ {
+ //Ok
+ }
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
public void testNullProperties() throws Exception
{
Connection conn = cf.createConnection();
15 years
JBoss hornetq SVN: r8654 - in trunk: tests/src/org/hornetq/tests/integration/largemessage/mock and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-09 15:37:28 -0500 (Wed, 09 Dec 2009)
New Revision: 8654
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
Log:
use closeexecutor on invm connector too
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2009-12-09 20:35:20 UTC (rev 8653)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2009-12-09 20:37:28 UTC (rev 8654)
@@ -73,10 +73,13 @@
private volatile boolean started;
protected final OrderedExecutorFactory executorFactory;
+
+ private final Executor closeExecutor;
public InVMConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor threadPool)
{
this.listener = listener;
@@ -84,6 +87,8 @@
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
this.handler = handler;
+
+ this.closeExecutor = closeExecutor;
executorFactory = new OrderedExecutorFactory(threadPool);
@@ -187,28 +192,26 @@
acceptor.disconnect((String)connectionID);
// Execute on different thread to avoid deadlocks
- new Thread()
+ closeExecutor.execute(new Runnable()
{
- @Override
public void run()
{
listener.connectionDestroyed(connectionID);
- }
- }.start();
+ }
+ });
}
}
public void connectionException(final Object connectionID, final HornetQException me)
{
// Execute on different thread to avoid deadlocks
- new Thread()
+ closeExecutor.execute(new Runnable()
{
- @Override
public void run()
{
listener.connectionException(connectionID, me);
- }
- }.start();
+ }
+ });
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java 2009-12-09 20:35:20 UTC (rev 8653)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java 2009-12-09 20:37:28 UTC (rev 8654)
@@ -33,11 +33,11 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
- final Executor closExecutor,
+ final Executor closeExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- InVMConnector connector = new InVMConnector(configuration, handler, listener, threadPool);
+ InVMConnector connector = new InVMConnector(configuration, handler, listener, closeExecutor, threadPool);
if (connector.getAcceptor() == null)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2009-12-09 20:35:20 UTC (rev 8653)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2009-12-09 20:37:28 UTC (rev 8654)
@@ -46,7 +46,7 @@
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(configuration, handler, listener, Executors.newCachedThreadPool());
+ super(configuration, handler, listener, Executors.newSingleThreadExecutor(), Executors.newCachedThreadPool());
callback = (MockCallback)configuration.get("callback");
}
15 years
JBoss hornetq SVN: r8653 - in trunk/examples/jms: jmx/src/org/hornetq/jms/example and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-09 15:35:20 -0500 (Wed, 09 Dec 2009)
New Revision: 8653
Modified:
trunk/examples/jms/client-kickoff/src/org/hornetq/jms/example/ClientKickoffExample.java
trunk/examples/jms/jmx/src/org/hornetq/jms/example/JMXExample.java
trunk/examples/jms/message-counters/src/org/hornetq/jms/example/MessageCounterExample.java
Log:
added explicit cast to MBeanServerInvocationHandler.newProxyInstance() calls
* required for Java 5 compatibility
Modified: trunk/examples/jms/client-kickoff/src/org/hornetq/jms/example/ClientKickoffExample.java
===================================================================
--- trunk/examples/jms/client-kickoff/src/org/hornetq/jms/example/ClientKickoffExample.java 2009-12-09 20:31:09 UTC (rev 8652)
+++ trunk/examples/jms/client-kickoff/src/org/hornetq/jms/example/ClientKickoffExample.java 2009-12-09 20:35:20 UTC (rev 8653)
@@ -78,7 +78,7 @@
ObjectName on = ObjectNameBuilder.DEFAULT.getHornetQServerObjectName();
JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap<String, String>());
MBeanServerConnection mbsc = connector.getMBeanServerConnection();
- HornetQServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(mbsc,
+ HornetQServerControl serverControl = (HornetQServerControl)MBeanServerInvocationHandler.newProxyInstance(mbsc,
on,
HornetQServerControl.class,
false);
Modified: trunk/examples/jms/jmx/src/org/hornetq/jms/example/JMXExample.java
===================================================================
--- trunk/examples/jms/jmx/src/org/hornetq/jms/example/JMXExample.java 2009-12-09 20:31:09 UTC (rev 8652)
+++ trunk/examples/jms/jmx/src/org/hornetq/jms/example/JMXExample.java 2009-12-09 20:35:20 UTC (rev 8653)
@@ -90,7 +90,7 @@
MBeanServerConnection mbsc = connector.getMBeanServerConnection();
// Step 12. Create a JMSQueueControl proxy to manage the queue on the server
- JMSQueueControl queueControl = MBeanServerInvocationHandler.newProxyInstance(mbsc,
+ JMSQueueControl queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc,
on,
JMSQueueControl.class,
false);
Modified: trunk/examples/jms/message-counters/src/org/hornetq/jms/example/MessageCounterExample.java
===================================================================
--- trunk/examples/jms/message-counters/src/org/hornetq/jms/example/MessageCounterExample.java 2009-12-09 20:31:09 UTC (rev 8652)
+++ trunk/examples/jms/message-counters/src/org/hornetq/jms/example/MessageCounterExample.java 2009-12-09 20:35:20 UTC (rev 8653)
@@ -84,7 +84,7 @@
ObjectName on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queue.getQueueName());
JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
MBeanServerConnection mbsc = connector.getMBeanServerConnection();
- JMSQueueControl queueControl = MBeanServerInvocationHandler.newProxyInstance(mbsc,
+ JMSQueueControl queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc,
on,
JMSQueueControl.class,
false);
15 years
JBoss hornetq SVN: r8652 - trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 15:31:09 -0500 (Wed, 09 Dec 2009)
New Revision: 8652
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
Log:
Fixing compilation issue with JDK 1.5 on JMS-tests
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-09 20:29:24 UTC (rev 8651)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-09 20:31:09 UTC (rev 8652)
@@ -423,7 +423,7 @@
public List<String> listAllSubscribersForTopic(final String s) throws Exception
{
ObjectName objectName = ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(s);
- TopicControl topic = MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
+ TopicControl topic = (TopicControl)MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
objectName,
TopicControl.class,
false);
15 years
JBoss hornetq SVN: r8651 - in trunk: examples/core/perf/server0 and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-09 15:29:24 -0500 (Wed, 09 Dec 2009)
New Revision: 8651
Modified:
trunk/build-hornetq.xml
trunk/examples/core/perf/server0/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
fixed re-attach ordering issue
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/build-hornetq.xml 2009-12-09 20:29:24 UTC (rev 8651)
@@ -1373,8 +1373,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${test.classes.dir}">
- <include name="**/org/hornetq/tests/stress/**/*${test-mask}.class"/>
- <exclude name="**/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.class"/>
+ <include name="**/org/hornetq/tests/stress/**/*${test-mask}.class"/>
</fileset>
</batchtest>
</junit>
Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-09 20:29:24 UTC (rev 8651)
@@ -17,7 +17,7 @@
<persistence-enabled>true</persistence-enabled>
- <journal-sync-non-transactional>true</journal-sync-non-transactional>
+ <journal-sync-non-transactional>false</journal-sync-non-transactional>
<journal-sync-transactional>true</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<journal-min-files>20</journal-min-files>
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -846,7 +846,6 @@
// We lock the channel to prevent any packets to be added to the resend
// cache during the failover process
channel.lock();
-
try
{
channel.transferConnection(backupConnection);
@@ -854,9 +853,11 @@
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
+
+ int lcid = channel.getLastConfirmedCommandID();
+
+ Packet request = new ReattachSessionMessage(name, lcid);
- Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
-
Channel channel1 = backupConnection.getChannel(1, -1);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
@@ -865,10 +866,11 @@
{
// The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
+ channel.replayCommands(response.getLastConfirmedCommandID(), channel.getID());
}
else
{
+
// The session wasn't found on the server - probably we're failing over onto a backup server where the
// session won't exist or the target server has been restarted - in this case the session will need to be
// recreated,
@@ -994,6 +996,7 @@
channel.returnBlocking();
}
+ channel.setTransferring(false);
}
catch (Throwable t)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -899,26 +899,20 @@
Connection tc = null;
try
- {
- if (connector == null)
- {
- DelegatingBufferHandler handler = new DelegatingBufferHandler();
+ {
+ DelegatingBufferHandler handler = new DelegatingBufferHandler();
- connector = connectorFactory.createConnector(transportParams,
- handler,
- this,
- closeExecutor,
- threadPool,
- scheduledThreadPool);
+ connector = connectorFactory.createConnector(transportParams,
+ handler,
+ this,
+ closeExecutor,
+ threadPool,
+ scheduledThreadPool);
- if (connector != null)
- {
- connector.start();
- }
- }
-
if (connector != null)
{
+ connector.start();
+
tc = connector.createConnection();
if (tc == null)
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -39,9 +39,9 @@
void transferConnection(RemotingConnection newConnection);
- void replayCommands(int lastReceivedCommandID, final long newID);
+ void replayCommands(int lastConfirmedCommandID, final long newID);
- int getLastReceivedCommandID();
+ int getLastConfirmedCommandID();
void lock();
@@ -64,4 +64,6 @@
void clearCommands();
int getConfirmationWindowSize();
+
+ void setTransferring(boolean transferring);
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -50,7 +50,7 @@
private volatile int firstStoredCommandID;
- private volatile int lastReceivedCommandID = -1;
+ private volatile int lastConfirmedCommandID = -1;
private volatile RemotingConnection connection;
@@ -73,7 +73,9 @@
private int receivedBytes;
private CommandConfirmationHandler commandConfirmationHandler;
-
+
+ private volatile boolean transferring;
+
public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
{
this.connection = connection;
@@ -97,9 +99,9 @@
return id;
}
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public Lock getLock()
@@ -140,6 +142,11 @@
{
send(packet, false);
}
+
+ public void setTransferring(boolean transferring)
+ {
+ this.transferring = transferring;
+ }
// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
@@ -147,11 +154,11 @@
synchronized (sendLock)
{
packet.setChannelID(id);
-
+
final HornetQBuffer buffer = packet.encode(connection);
lock.lock();
-
+
try
{
while (failingOver)
@@ -165,6 +172,13 @@
{
}
}
+
+ //Sanity check
+ if (transferring)
+ {
+ throw new IllegalStateException("Cannot send a packet while channel is doing failover");
+ }
+
if (resendCache != null && packet.isRequiresConfirmations())
{
@@ -197,7 +211,7 @@
synchronized (sendBlockingLock)
{
packet.setChannelID(id);
-
+
final HornetQBuffer buffer = packet.encode(connection);
lock.lock();
@@ -306,7 +320,7 @@
closed = true;
}
-
+
public void transferConnection(final RemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
@@ -322,19 +336,21 @@
rnewConnection.putChannel(id, this);
connection = rnewConnection;
+
+ transferring = true;
}
}
- public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
+ public void replayCommands(final int otherLastConfirmedCommandID, final long newChannelID)
{
if (resendCache != null)
{
- clearUpTo(otherLastReceivedCommandID);
+ clearUpTo(otherLastConfirmedCommandID);
for (final Packet packet : resendCache)
{
packet.setChannelID(newChannelID);
-
+
doWrite(packet);
}
}
@@ -372,7 +388,7 @@
{
receivedBytes = 0;
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
confirmed.setChannelID(id);
@@ -384,7 +400,7 @@
{
if (resendCache != null && packet.isRequiresConfirmations())
{
- lastReceivedCommandID++;
+ lastConfirmedCommandID++;
receivedBytes += packet.getPacketSize();
@@ -392,7 +408,7 @@
{
receivedBytes = 0;
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
confirmed.setChannelID(id);
@@ -405,14 +421,14 @@
{
if (resendCache != null)
{
- lastReceivedCommandID = -1;
+ lastConfirmedCommandID = -1;
firstStoredCommandID = 0;
resendCache.clear();
}
}
-
+
public void handlePacket(final Packet packet)
{
if (packet.getType() == PacketImpl.PACKETS_CONFIRMED)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionMessage.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -30,19 +30,19 @@
private String name;
- private int lastReceivedCommandID;
+ private int lastConfirmedCommandID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+ public ReattachSessionMessage(final String name, final int lastConfirmedCommandID)
{
super(PacketImpl.REATTACH_SESSION);
this.name = name;
- this.lastReceivedCommandID = lastReceivedCommandID;
+ this.lastConfirmedCommandID = lastConfirmedCommandID;
}
public ReattachSessionMessage()
@@ -57,23 +57,23 @@
return name;
}
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(name);
- buffer.writeInt(lastReceivedCommandID);
+ buffer.writeInt(lastConfirmedCommandID);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
name = buffer.readString();
- lastReceivedCommandID = buffer.readInt();
+ lastConfirmedCommandID = buffer.readInt();
}
@Override
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -28,7 +28,7 @@
// Attributes ----------------------------------------------------
- private int lastReceivedCommandID;
+ private int lastConfirmedCommandID;
private boolean reattached;
@@ -36,11 +36,11 @@
// Constructors --------------------------------------------------
- public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean reattached)
+ public ReattachSessionResponseMessage(final int lastConfirmedCommandID, final boolean reattached)
{
super(PacketImpl.REATTACH_SESSION_RESP);
- this.lastReceivedCommandID = lastReceivedCommandID;
+ this.lastConfirmedCommandID = lastConfirmedCommandID;
this.reattached = reattached;
}
@@ -52,9 +52,9 @@
// Public --------------------------------------------------------
- public int getLastReceivedCommandID()
+ public int getLastConfirmedCommandID()
{
- return lastReceivedCommandID;
+ return lastConfirmedCommandID;
}
public boolean isReattached()
@@ -65,14 +65,14 @@
@Override
public void encodeRest(final HornetQBuffer buffer)
{
- buffer.writeInt(lastReceivedCommandID);
+ buffer.writeInt(lastConfirmedCommandID);
buffer.writeBoolean(reattached);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
- lastReceivedCommandID = buffer.readInt();
+ lastConfirmedCommandID = buffer.readInt();
reattached = buffer.readBoolean();
}
@@ -92,7 +92,7 @@
ReattachSessionResponseMessage r = (ReattachSessionResponseMessage)other;
- return super.equals(other) && lastReceivedCommandID == r.lastReceivedCommandID;
+ return super.equals(other) && lastConfirmedCommandID == r.lastConfirmedCommandID;
}
@Override
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -70,7 +70,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+ ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
/** The journal at the backup server has to be equivalent as the journal used on the live node.
* Or else the backup node is out of sync. */
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -42,5 +42,9 @@
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
- void forceDelivery(long sequence);
+ void forceDelivery(long sequence);
+
+ void setTransferring(boolean transferring);
}
+
+
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -150,7 +150,7 @@
try
{
- response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+ response = server.reattachSession(connection, request.getName(), request.getLastConfirmedCommandID());
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -539,7 +539,7 @@
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
final String name,
- final int lastReceivedCommandID) throws Exception
+ final int lastConfirmedCommandID) throws Exception
{
if (!started)
{
@@ -581,9 +581,9 @@
else
{
// Reconnect the channel to the new connection
- int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
+ int serverLastConfirmedCommandID = session.transferConnection(connection, lastConfirmedCommandID);
- return new ReattachSessionResponseMessage(serverLastReceivedCommandID, true);
+ return new ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -120,7 +120,10 @@
private final ManagementService managementService;
private final Binding binding;
+
+ private boolean transferring = false;
+
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -197,8 +200,8 @@
// If the consumer is stopped then we don't accept the message, it
// should go back into the
// queue for delivery later.
- if (!started)
- {
+ if (!started || transferring)
+ {
return HandleStatus.BUSY;
}
@@ -413,10 +416,27 @@
promptDelivery(true);
}
}
-
+
+ public void setTransferring(final boolean transferring)
+ {
+ lock.lock();
+ try
+ {
+ this.transferring = transferring;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ if (!transferring)
+ {
+ promptDelivery(true);
+ }
+ }
+
public void receiveCredits(final int credits) throws Exception
{
-
if (credits == -1)
{
// No flow control
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-09 20:24:41 UTC (rev 8650)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-09 20:29:24 UTC (rev 8651)
@@ -1596,14 +1596,14 @@
}
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
- {
- boolean wasStarted = started;
+ {
+ //We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get delivered
+ //after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
+ //sequence of packets.
+ //It is not sufficient to just stop the session, since right after stopping the session, another session start might be executed
+ //before we have transferred the connection, leaving it in a started state
+ setTransferring(true);
- if (wasStarted)
- {
- setStarted(false);
- }
-
remotingConnection.removeFailureListener(this);
remotingConnection.removeCloseListener(this);
@@ -1613,9 +1613,9 @@
// the replicating connection will cause the outstanding responses to be be replayed on the live server,
// if these reach the client who then subsequently fails over, on reconnection to backup, it will have
// received responses that the backup did not know about.
-
+
channel.transferConnection(newConnection);
-
+
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = newConnection;
@@ -1623,15 +1623,14 @@
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
- int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+ int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
channel.replayCommands(lastReceivedCommandID, id);
+
+ channel.setTransferring(false);
+
+ setTransferring(false);
- if (wasStarted)
- {
- setStarted(true);
- }
-
return serverLastReceivedCommandID;
}
@@ -1807,7 +1806,17 @@
started = s;
}
+
+ private void setTransferring(final boolean transferring)
+ {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+ for (ServerConsumer consumer : consumersClone)
+ {
+ consumer.setTransferring(transferring);
+ }
+ }
+
/**
* We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
* which is stored on the header
15 years
JBoss hornetq SVN: r8650 - trunk.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 15:24:41 -0500 (Wed, 09 Dec 2009)
New Revision: 8650
Modified:
trunk/build-hornetq.xml
Log:
Using the same logging properties as we use on the regular testsuite
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-12-09 19:39:24 UTC (rev 8649)
+++ trunk/build-hornetq.xml 2009-12-09 20:24:41 UTC (rev 8650)
@@ -1361,7 +1361,7 @@
<sysproperty key="java.io.tmpdir" value="${java.io.tmpdir}"/>
<jvmarg value="-Xmx1024M"/>
<jvmarg value="-Dmodule.output=./"/>
- <jvmarg value="-Djava.util.logging.config.file=src/config/logging.properties"/>
+ <jvmarg value="-Djava.util.logging.config.file=src/config/trunk/non-clustered/logging.properties"/>
<jvmarg value="-Djava.library.path=native/bin"/>
<!--<jvmarg line="-Xmx512M -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>-->
15 years
JBoss hornetq SVN: r8649 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 14:39:24 -0500 (Wed, 09 Dec 2009)
New Revision: 8649
Modified:
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
Log:
tweaks on finalize to avoid exceptions when finalize happened while closing didn't finish.
* Some weird behaviour observed on this thread: http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4269860#4269860
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-09 19:00:06 UTC (rev 8648)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-09 19:39:24 UTC (rev 8649)
@@ -50,6 +50,8 @@
private final ClientSessionInternal session;
private final Exception creationStack;
+
+ private volatile boolean closed;
private static Set<DelegatingSession> sessions = new ConcurrentHashSet<DelegatingSession>();
@@ -68,7 +70,9 @@
@Override
protected void finalize() throws Throwable
{
- if (!session.isClosed())
+ // In some scenarios we have seen the JDK finalizing the DelegatingSession while the call to session.close() was still in progress
+ //
+ if (!closed && !session.isClosed())
{
DelegatingSession.log.warn("I'm closing a core ClientSession you left open. Please make sure you close all ClientSessions explicitly " + "before letting them go out of scope! " +
System.identityHashCode(this));
@@ -130,6 +134,8 @@
public void close() throws HornetQException
{
+ closed = true;
+
if (DelegatingSession.debug)
{
DelegatingSession.sessions.remove(this);
15 years
JBoss hornetq SVN: r8648 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-09 14:00:06 -0500 (Wed, 09 Dec 2009)
New Revision: 8648
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
Log:
failover tests for symmetric cluster
* added test when failing backup nodes
* added test to fail both live and activated backup nodes
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2009-12-09 18:59:58 UTC (rev 8647)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2009-12-09 19:00:06 UTC (rev 8648)
@@ -75,7 +75,7 @@
return false;
}
- public void testFailAllNodes() throws Exception
+ public void testFailLiveNodes() throws Exception
{
setupCluster();
@@ -187,7 +187,110 @@
ClusterWithBackupFailoverTestBase.log.info("*** test done");
}
+
+ public void testFailBackupNodes() throws Exception
+ {
+ setupCluster();
+ startServers(3, 4, 5, 0, 1, 2);
+
+ setupSessionFactory(0, 3, isNetty(), false);
+ setupSessionFactory(1, 4, isNetty(), false);
+ setupSessionFactory(2, 5, isNetty(), false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(3);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(4);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(5);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+
+
+ removeConsumer(0);
+ removeConsumer(1);
+ removeConsumer(2);
+
+ stopServers();
+
+ ClusterWithBackupFailoverTestBase.log.info("*** test done");
+ }
+
protected void setupCluster() throws Exception
{
setupCluster(false);
@@ -224,15 +327,141 @@
// Prevent remoting service taking any more connections
server.getRemotingService().freeze();
- // Stop it broadcasting
- for (BroadcastGroup group : server.getClusterManager().getBroadcastGroups())
+ if (server.getClusterManager() != null)
{
- group.stop();
+ // Stop it broadcasting
+ for (BroadcastGroup group : server.getClusterManager().getBroadcastGroups())
+ {
+ group.stop();
+ }
}
-
+
FailoverManagerImpl.failAllConnectionsForConnector(serverTC);
server.stop();
}
+ public void testFailAllNodes() throws Exception
+ {
+ setupCluster();
+
+ startServers(3, 4, 5, 0, 1, 2);
+
+ setupSessionFactory(0, 3, isNetty(), false);
+ setupSessionFactory(1, 4, isNetty(), false);
+ setupSessionFactory(2, 5, isNetty(), false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(0);
+
+ // live nodes
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+
+ // live nodes
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+
+ ClusterWithBackupFailoverTestBase.log.info("** now sending");
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ removeConsumer(0);
+ failNode(3);
+
+ // live nodes
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ // live nodes
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
+
+ failNode(1);
+
+ // live nodes
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ // activated backup nodes
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ // live nodes
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+ // activated backup nodes
+ waitForBindings(4, "queues.testaddress", 1, 1, false);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
+
+ removeConsumer(1);
+ failNode(4);
+
+ // live nodes
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ // live nodes
+ waitForBindings(2, "queues.testaddress", 1, 0, false);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
+
+ failNode(2);
+
+ // live nodes
+ waitForBindings(5, "queues.testaddress", 1, 1, true);
+ // live nodes
+ waitForBindings(5, "queues.testaddress", 0, 0, false);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
+
+ removeConsumer(2);
+ failNode(5);
+
+ stopServers();
+
+ ClusterWithBackupFailoverTestBase.log.info("*** test done");
+ }
}
15 years