[jboss-cvs] JBoss Messaging SVN: r5277 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 5 07:54:32 EST 2008
Author: timfox
Date: 2008-11-05 07:54:32 -0500 (Wed, 05 Nov 2008)
New Revision: 5277
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
Log:
Failover of scheduled messages
Modified: trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java 2008-11-05 12:37:47 UTC (rev 5276)
+++ trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java 2008-11-05 12:54:32 UTC (rev 5277)
@@ -34,7 +34,9 @@
int getScheduledCount();
- List<MessageReference> getScheduledMessages();
+ List<MessageReference> getScheduledReferences();
List<MessageReference> cancel();
+
+ MessageReference removeReferenceWithID(long id);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-11-05 12:37:47 UTC (rev 5276)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-11-05 12:54:32 UTC (rev 5277)
@@ -257,6 +257,12 @@
break;
}
}
+
+ if (removed == null)
+ {
+ //Look in scheduled deliveries
+ removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+ }
return removed;
}
@@ -305,7 +311,7 @@
public synchronized List<MessageReference> getScheduledMessages()
{
- return scheduledDeliveryHandler.getScheduledMessages();
+ return scheduledDeliveryHandler.getScheduledReferences();
}
public int getDeliveringCount()
@@ -526,6 +532,8 @@
" so queue will be activated now");
backup = false;
+
+ scheduledDeliveryHandler.reSchedule();
deliverAsync(executor);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java 2008-11-05 12:37:47 UTC (rev 5276)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java 2008-11-05 12:54:32 UTC (rev 5277)
@@ -22,9 +22,9 @@
package org.jboss.messaging.core.server.impl;
import java.util.ArrayList;
-import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -36,7 +36,10 @@
/**
* Handles scheduling deliveries to a queue at the correct time.
*
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="clebert.suconic at jboss.com">Clebert Suconic</a>
*/
public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
{
@@ -46,7 +49,9 @@
private final ScheduledExecutorService scheduledExecutor;
- private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+ private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
+
+ private boolean rescheduled;
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
{
@@ -66,7 +71,10 @@
ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
- scheduledRunnables.add(runnable);
+ synchronized (scheduledRunnables)
+ {
+ scheduledRunnables.put(ref.getMessage().getMessageID(), runnable);
+ }
if (!backup)
{
@@ -77,12 +85,20 @@
}
return false;
}
-
+
public void reSchedule()
{
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ synchronized (scheduledRunnables)
{
- scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+ if (!rescheduled)
+ {
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
+ {
+ scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+ }
+
+ rescheduled = true;
+ }
}
}
@@ -91,12 +107,13 @@
return scheduledRunnables.size();
}
- public List<MessageReference> getScheduledMessages()
+ public List<MessageReference> getScheduledReferences()
{
List<MessageReference> refs = new ArrayList<MessageReference>();
+
synchronized (scheduledRunnables)
{
- for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables)
+ for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables.values())
{
refs.add(scheduledRunnable.getReference());
}
@@ -107,11 +124,13 @@
public List<MessageReference> cancel()
{
List<MessageReference> refs = new ArrayList<MessageReference>();
+
synchronized (scheduledRunnables)
{
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
{
runnable.cancel();
+
refs.add(runnable.getReference());
}
@@ -119,6 +138,14 @@
}
return refs;
}
+
+ public MessageReference removeReferenceWithID(long id)
+ {
+ synchronized (scheduledRunnables)
+ {
+ return scheduledRunnables.remove(id).getReference();
+ }
+ }
private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
{
@@ -180,9 +207,9 @@
synchronized (scheduledRunnables)
{
- boolean removed = scheduledRunnables.remove(this);
+ Object removed = scheduledRunnables.remove(ref.getMessage().getMessageID());
- if (!removed)
+ if (removed == null)
{
log.warn("Failed to remove timeout " + this);
@@ -194,7 +221,6 @@
// TODO - need to replicate this so backup node also adds back to
// front of queue
ref.getQueue().addFirst(ref);
-
}
}
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java 2008-11-05 12:54:32 UTC (rev 5277)
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A FailoverScheduledMessageTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 5 Nov 2008 11:18:51
+ *
+ */
+public class FailoverScheduledMessageTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /*
+ * Send some scheduled messsages on live
+ * Let some fire on live
+ * Failover
+ * Let rest fire on backup
+ * Assert no duplicates and all are received ok
+ */
+ public void testScheduled() throws Exception
+ {
+ ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ sf1.setSendWindowSize(32 * 1024);
+
+ ClientSession session1 = sf1.createSession(false, true, true, false);
+
+ session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ session1.start();
+
+ ClientProducer producer = session1.createProducer(ADDRESS);
+
+ final int numMessages = 10;
+
+ long now = System.currentTimeMillis();
+
+ final long delay = 200;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message, now + delay * i);
+ }
+
+ ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+
+ final RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ //Sleep a little while to ensure that some messages are consumed before failover
+ Thread.sleep(delay * numMessages / 2);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+ }
+ };
+
+ t.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(delay * 2);
+
+ assertNotNull(message);
+
+ log.info("got message " + i);
+
+ message.acknowledge();
+ }
+
+ ClientMessage message = consumer1.receive(delay * 2);
+
+ assertNull(message);
+
+ t.join();
+
+ session1.close();
+
+ //Make sure no more messages
+ ClientSession session2 = sf1.createSession(false, true, true, false);
+
+ session2.start();
+
+ ClientConsumer consumer2 = session2.createConsumer(ADDRESS);
+
+ message = consumer2.receive(1000);
+
+ assertNull(message);
+
+ session2.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
+
More information about the jboss-cvs-commits
mailing list