[jboss-cvs] JBoss Messaging SVN: r2495 - in trunk: docs/examples/common/src/org/jboss/example/jms/common and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 28 09:31:20 EST 2007


Author: timfox
Date: 2007-02-28 09:31:19 -0500 (Wed, 28 Feb 2007)
New Revision: 2495

Added:
   trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
Modified:
   trunk/.classpath
   trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java
   trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-904



Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/.classpath	2007-02-28 14:31:19 UTC (rev 2495)
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
+	<classpathentry kind="src" path="docs/examples/distributed-queue/src"/>
 	<classpathentry kind="src" path="output/gen-parsers"/>
 	<classpathentry kind="src" path="docs/examples/common/src"/>
 	<classpathentry kind="src" path="docs/examples/distributed-topic/src"/>

Modified: trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java
===================================================================
--- trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -184,6 +184,8 @@
       setup(null);
    }
    
+   
+   
    protected void setup(InitialContext ic) throws Exception
    {
       String destinationName;

Modified: trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java
===================================================================
--- trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -21,6 +21,7 @@
 */
 package org.jboss.example.jms.common;
 
+import javax.management.Attribute;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 import javax.naming.InitialContext;
@@ -65,7 +66,34 @@
    {
        deployQueue(jndiName,null);
    }
+   
+   public static void activateMessagePullPolicy(InitialContext ic) throws Exception
+   {
+      //Need to promgrammatically activate the default message pull policy.
+      
+      //We need to do this here since the default config ships with the NullMessagePullPolicy which
+      //doesn't do message redistribution
+      
+      //You won't have to do this in your own programs - you just need to make sure
+      //your postoffice MBean config specifies the DefaultMessagePullPolicy
+      
+      MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);
+      
+      ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
 
+      mBeanServer.invoke(postOfficeObjectName, "stop", null, null);
+      
+      Attribute att = new Attribute("MessagePullPolicy", "org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy");
+      
+      mBeanServer.setAttribute(postOfficeObjectName, att);
+
+      //Restart the post office
+      
+      mBeanServer.invoke(postOfficeObjectName, "start", null, null);
+      
+      
+   }
+
    public static void deployQueue(String jndiName, InitialContext ic) throws Exception
    {
       MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -505,7 +505,7 @@
    /**
     * Needed for failover
     */
-   public void synchronizeWith(MessageCallbackHandler newHandler, QueuedExecutor sessionExecutor)
+   public void synchronizeWith(MessageCallbackHandler newHandler)
    {
       consumerID = newHandler.consumerID;
 
@@ -517,11 +517,8 @@
 
       buffer.clear();
       
-      this.sessionExecutor = sessionExecutor;
-      
       // need to reset toggle state
-      serverSending = true;
-      
+      serverSending = true;      
    }
    
    public long getLastDeliveryId()

Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -138,10 +138,8 @@
 
       MessageCallbackHandler handler = oldCallbackManager.unregisterHandler(oldConsumerID);
       MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(consumerID);
-
-      SessionState sstate = (SessionState)this.getParent();
-      
-      handler.synchronizeWith(newHandler, sstate.getExecutor());
+   
+      handler.synchronizeWith(newHandler);
       newCallbackManager.registerHandler(consumerID, handler);
    }
 

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -47,6 +47,7 @@
 import org.jboss.jms.tx.MessagingXAResource;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.util.ClearableQueuedExecutor;
 
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -84,7 +85,7 @@
    private Object currentTxId;
 
    // Executor used for executing onMessage methods
-   private QueuedExecutor executor;
+   private ClearableQueuedExecutor executor;
 
    private boolean recoverCalled;
    
@@ -133,7 +134,7 @@
          currentTxId = parent.getResourceManager().createLocalTx();
       }
 
-      createExecutor();
+      executor = new ClearableQueuedExecutor(new LinkedQueue());
 
       clientAckList = new ArrayList();
 
@@ -201,10 +202,8 @@
       // We need to clear anything waiting in the session executor - since there may be messages
       // from before failover waiting in there and we don't want them to get delivered after
       // failover.
-      executor.shutdownAfterProcessingCurrentTask();
+      executor.clearAllExceptCurrentTask();
       
-      createExecutor();
-
       ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
 
       for (Iterator i = getChildren().iterator(); i.hasNext(); )
@@ -268,6 +267,9 @@
 
       if (!isTransacted() || (isXA() && getCurrentTxId() == null))
       {
+         // TODO - the check "(isXA() && getCurrentTxId() == null)" shouldn't be necessary any more
+         // since xa sessions no longer fall back to non transacted
+         
          // Non transacted session or an XA session with no transaction set (it falls back
          // to AUTO_ACKNOWLEDGE)
 
@@ -448,12 +450,6 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private void createExecutor()
-   {
-      executor = new QueuedExecutor(new LinkedQueue());
-   }
-
-   
    // Inner classes --------------------------------------------------------------------------------
 
 }

Added: trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * A ClearableQueuedExecutor
+ * 
+ * This class extends the QueuedExector with a method to clear all but the currently
+ * executing task without shutting it down.
+ * 
+ * We need this functionality when failing over a session.
+ * 
+ * In that case we need to clear all tasks apart from the currently executing one.
+ * 
+ * We can't just shutdownAfterProcessingCurrentTask then use another instance
+ * after failover since when failover resumes the current task and the next delivery
+ * will be executed on different threads and smack into each other.
+ * 
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-904
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ClearableQueuedExecutor extends QueuedExecutor
+{
+   public ClearableQueuedExecutor()
+   {
+   }
+
+   public ClearableQueuedExecutor(Channel channel)
+   {
+      super(channel);
+   }
+
+   public void clearAllExceptCurrentTask()
+   {
+      try
+      { 
+        while (queue_.poll(0) != null);
+      }
+      catch (InterruptedException ex)
+      {
+        Thread.currentThread().interrupt();
+      }
+   }
+   
+}
+

Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-02-28 14:31:19 UTC (rev 2495)
@@ -35,11 +35,10 @@
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 
-import junit.framework.TestCase;
-
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.logging.Logger;
 import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.util.ProxyAssertSupport;
 import org.jboss.tm.TransactionManagerService;
 
 /**
@@ -53,7 +52,7 @@
  * @version <tt>$Revision$</tt>
  * $Id$
  */
-public class MessagingTestCase extends TestCase
+public class MessagingTestCase extends ProxyAssertSupport 
 {
    // Constants -----------------------------------------------------
 




More information about the jboss-cvs-commits mailing list