[jboss-cvs] JBoss Messaging SVN: r5277 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 5 07:54:32 EST 2008


Author: timfox
Date: 2008-11-05 07:54:32 -0500 (Wed, 05 Nov 2008)
New Revision: 5277

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
Log:
Failover of scheduled messages


Modified: trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java	2008-11-05 12:37:47 UTC (rev 5276)
+++ trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java	2008-11-05 12:54:32 UTC (rev 5277)
@@ -34,7 +34,9 @@
 
    int getScheduledCount();
 
-   List<MessageReference> getScheduledMessages();
+   List<MessageReference> getScheduledReferences();
 
    List<MessageReference> cancel();
+   
+   MessageReference removeReferenceWithID(long id);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-11-05 12:37:47 UTC (rev 5276)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-11-05 12:54:32 UTC (rev 5277)
@@ -257,6 +257,12 @@
             break;
          }
       }
+      
+      if (removed == null)
+      {
+         //Look in scheduled deliveries
+         removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+      }
 
       return removed;
    }
@@ -305,7 +311,7 @@
 
    public synchronized List<MessageReference> getScheduledMessages()
    {
-      return scheduledDeliveryHandler.getScheduledMessages();
+      return scheduledDeliveryHandler.getScheduledReferences();
    }
 
    public int getDeliveringCount()
@@ -526,6 +532,8 @@
                   " so queue will be activated now");
 
          backup = false;
+         
+         scheduledDeliveryHandler.reSchedule();
 
          deliverAsync(executor);
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	2008-11-05 12:37:47 UTC (rev 5276)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	2008-11-05 12:54:32 UTC (rev 5277)
@@ -22,9 +22,9 @@
 package org.jboss.messaging.core.server.impl;
 
 import java.util.ArrayList;
-import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -36,7 +36,10 @@
 /**
  * Handles scheduling deliveries to a queue at the correct time.
  * 
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="clebert.suconic at jboss.com">Clebert Suconic</a>
  */
 public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
 {
@@ -46,7 +49,9 @@
 
    private final ScheduledExecutorService scheduledExecutor;
 
-   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+   private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
+   
+   private boolean rescheduled;
 
    public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
    {
@@ -66,7 +71,10 @@
 
          ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
 
-         scheduledRunnables.add(runnable);
+         synchronized (scheduledRunnables)
+         {
+            scheduledRunnables.put(ref.getMessage().getMessageID(), runnable);
+         }
 
          if (!backup)
          {
@@ -77,12 +85,20 @@
       }
       return false;
    }
-
+   
    public void reSchedule()
    {
-      for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+      synchronized (scheduledRunnables)
       {
-         scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+         if (!rescheduled)
+         {
+            for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
+            {
+               scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+            }
+            
+            rescheduled = true;
+         }
       }
    }
 
@@ -91,12 +107,13 @@
       return scheduledRunnables.size();
    }
 
-   public List<MessageReference> getScheduledMessages()
+   public List<MessageReference> getScheduledReferences()
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
+      
       synchronized (scheduledRunnables)
       {
-         for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables)
+         for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables.values())
          {
             refs.add(scheduledRunnable.getReference());
          }
@@ -107,11 +124,13 @@
    public List<MessageReference> cancel()
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
+      
       synchronized (scheduledRunnables)
       {
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+         for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
          {
             runnable.cancel();
+            
             refs.add(runnable.getReference());
          }
 
@@ -119,6 +138,14 @@
       }
       return refs;
    }
+   
+   public MessageReference removeReferenceWithID(long id)
+   {
+      synchronized (scheduledRunnables)
+      {
+         return scheduledRunnables.remove(id).getReference();
+      }
+   }
 
    private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
    {
@@ -180,9 +207,9 @@
 
          synchronized (scheduledRunnables)
          {
-            boolean removed = scheduledRunnables.remove(this);
+            Object removed = scheduledRunnables.remove(ref.getMessage().getMessageID());
 
-            if (!removed)
+            if (removed == null)
             {
                log.warn("Failed to remove timeout " + this);
 
@@ -194,7 +221,6 @@
          // TODO - need to replicate this so backup node also adds back to
          // front of queue
          ref.getQueue().addFirst(ref);
-
       }
    }
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java	2008-11-05 12:54:32 UTC (rev 5277)
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A FailoverScheduledMessageTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 5 Nov 2008 11:18:51
+ *
+ */
+public class FailoverScheduledMessageTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /*
+    * Send some scheduled messsages on live
+    * Let some fire on live
+    * Failover
+    * Let rest fire on backup
+    * Assert no duplicates and all are received ok
+    */
+   public void testScheduled() throws Exception
+   {            
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+      
+      sf1.setSendWindowSize(32 * 1024);
+  
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+      
+      session1.start();
+
+      ClientProducer producer = session1.createProducer(ADDRESS);
+                 
+      final int numMessages = 10;
+      
+      long now = System.currentTimeMillis();
+      
+      final long delay = 200;
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);         
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message, now + delay * i);                
+      }
+      
+      ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+                 
+      final RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+ 
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               //Sleep a little while to ensure that some messages are consumed before failover
+               Thread.sleep(delay * numMessages / 2);
+            }
+            catch (InterruptedException e)
+            {               
+            }
+            
+            conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+         }
+      };
+      
+      t.start();
+                   
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(delay * 2);
+                           
+         assertNotNull(message);
+         
+         log.info("got message " + i);
+         
+         message.acknowledge();
+      }      
+      
+      ClientMessage message = consumer1.receive(delay * 2);
+      
+      assertNull(message);
+      
+      t.join();
+                   
+      session1.close();
+      
+      //Make sure no more messages
+      ClientSession session2 = sf1.createSession(false, true, true, false);
+      
+      session2.start();
+      
+      ClientConsumer consumer2 = session2.createConsumer(ADDRESS);
+      
+      message = consumer2.receive(1000);
+      
+      assertNull(message);
+      
+      session2.close();      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+




More information about the jboss-cvs-commits mailing list