[jboss-cvs] JBossAS SVN: r58404 - in trunk: jbossmq/src/main/org/jboss/mq testsuite/src/main/org/jboss/test/jbossmq

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 15 11:53:06 EST 2006


Author: adrian at jboss.org
Date: 2006-11-15 11:52:59 -0500 (Wed, 15 Nov 2006)
New Revision: 58404

Modified:
   trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
   trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java
Log:
[JBAS-3821] - Error from server during receive corrupts internal receiving state.

Modified: trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
===================================================================
--- trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java	2006-11-15 16:14:37 UTC (rev 58403)
+++ trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java	2006-11-15 16:52:59 UTC (rev 58404)
@@ -1,24 +1,24 @@
 /*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.mq;
 
 import java.util.LinkedList;
@@ -268,84 +268,80 @@
             log.trace("receive() " + this);
       }
 
-      synchronized (messages)
+      try
       {
-         //see if we have any undelivered messages before we go to the JMS
-         //server to look.
-         Message message = getMessage();
-         if (message != null)
+         synchronized (messages)
          {
-            synchronized (stateLock)
+            //see if we have any undelivered messages before we go to the JMS
+            //server to look.
+            Message message = getMessage();
+            if (message != null)
             {
-               receiving = false;
-               
                if (trace)
                   log.trace("receive() message in list " + message.getJMSMessageID() + " " + this);
+               return message;
             }
-            return message;
-         }
-         
-         // Loop through expired messages
-         while (true)
-         {
-            SpyMessage msg = session.connection.receive(subscription, 0);
-            if (msg != null)
+            
+            // Loop through expired messages
+            while (true)
             {
-               Message mes = preProcessMessage(msg);
-               if (mes != null)
+               SpyMessage msg = session.connection.receive(subscription, 0);
+               if (msg != null)
                {
-                  synchronized (stateLock)
+                  Message mes = preProcessMessage(msg);
+                  if (mes != null)
                   {
-                     receiving = false;
-                     
                      if (trace)
                         log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this);
+                     return mes;
                   }
-                  return mes;
                }
+               else
+                  break;
             }
-            else
-               break;
-         }
 
-         if (trace)
-            log.trace("No message in receive(), waiting " + this);
-         
-         try
-         {
-            waitingForMessage = true;
-            while (true)
+            if (trace)
+               log.trace("No message in receive(), waiting " + this);
+            
+            try
             {
-               if (isClosed())
+               waitingForMessage = true;
+               while (true)
                {
-                  if (trace)
-                     log.trace("Consumer closed in receive() " + this);
-                  return null;
+                  if (isClosed())
+                  {
+                     if (trace)
+                        log.trace("Consumer closed in receive() " + this);
+                     return null;
+                  }
+                  Message mes = getMessage();
+                  if (mes != null)
+                  {
+                     if (trace)
+                        log.trace("receive() message from list after wait " + this);
+                     return mes;
+                  }
+                  messages.wait();
                }
-               Message mes = getMessage();
-               if (mes != null)
-               {
-                  if (trace)
-                     log.trace("receive() message from list after wait " + this);
-                  return mes;
-               }
-               messages.wait();
             }
-         }
-         catch (Throwable t)
-         {
-            SpyJMSException.rethrowAsJMSException("Receive interupted", t);
-            throw new UnreachableStatementException();
-         }
-         finally
-         {
-            waitingForMessage = false;
-            synchronized (stateLock)
+            catch (Throwable t)
             {
-               receiving = false;
+               SpyJMSException.rethrowAsJMSException("Receive interupted", t);
+               throw new UnreachableStatementException();
             }
+            finally
+            {
+               waitingForMessage = false;
+            }
          }
       }
+      finally
+      {
+         synchronized (stateLock)
+         {
+            receiving = false;
+         }
+      }
    }
 
    public Message receive(long timeOut) throws JMSException
@@ -375,94 +371,89 @@
       if (trace)
          log.trace("receive(long) endTime=" + endTime + " " + this);
       
-      synchronized (messages)
+      try
       {
-         //see if we have any undelivered messages before we go to the JMS
-         //server to look.
-         Message message = getMessage();
-         if (message != null)
+         synchronized (messages)
          {
-            synchronized (stateLock)
+            //see if we have any undelivered messages before we go to the JMS
+            //server to look.
+            Message message = getMessage();
+            if (message != null)
             {
-               receiving = false;
-               
                if (trace)
                   log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this);
+               return message;
             }
-            return message;
-         }
-         // Loop through expired messages
-         while (true)
-         {
-            SpyMessage msg = session.connection.receive(subscription, timeOut);
-            if (msg != null)
+            // Loop through expired messages
+            while (true)
             {
-               Message mes = preProcessMessage(msg);
-               if (mes != null)
+               SpyMessage msg = session.connection.receive(subscription, timeOut);
+               if (msg != null)
                {
-                  synchronized (stateLock)
+                  Message mes = preProcessMessage(msg);
+                  if (mes != null)
                   {
-                     receiving = false;
-                     
                      if (trace)
                         log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this);
+                     return mes;
                   }
-                  return mes;
                }
+               else
+                  break;
             }
-            else
-               break;
-         }
 
-         if (trace)
-            log.trace("No message in receive(), waiting " + this);
-         
-         try
-         {
-            waitingForMessage = true;
-            while (true)
+            if (trace)
+               log.trace("No message in receive(), waiting " + this);
+            
+            try
             {
-               if (isClosed())
+               waitingForMessage = true;
+               while (true)
                {
-                  if (trace)
-                     log.trace("Consumer closed in receive(long) " + this);
-                  return null;
-               }
+                  if (isClosed())
+                  {
+                     if (trace)
+                        log.trace("Consumer closed in receive(long) " + this);
+                     return null;
+                  }
 
-               Message mes = getMessage();
-               if (mes != null)
-               {
-                  if (trace)
-                     log.trace("receive(long) message from list after wait " + this);
-                  return mes;
-               }
+                  Message mes = getMessage();
+                  if (mes != null)
+                  {
+                     if (trace)
+                        log.trace("receive(long) message from list after wait " + this);
+                     return mes;
+                  }
 
-               long att = endTime - System.currentTimeMillis();
-               if (att <= 0)
-               {
-                  if (trace)
-                     log.trace("receive(long) timed out endTime=" + endTime + " " + this);
-                  return null;
-               }
+                  long att = endTime - System.currentTimeMillis();
+                  if (att <= 0)
+                  {
+                     if (trace)
+                        log.trace("receive(long) timed out endTime=" + endTime + " " + this);
+                     return null;
+                  }
 
-               messages.wait(att);
+                  messages.wait(att);
+               }
             }
-         }
-         catch (Throwable t)
-         {
-            SpyJMSException.rethrowAsJMSException("Receive interupted", t);
-            throw new UnreachableStatementException();
-         }
-         finally
-         {
-            waitingForMessage = false;
-            synchronized (stateLock)
+            catch (Throwable t)
             {
-               receiving = false;
+               SpyJMSException.rethrowAsJMSException("Receive interupted", t);
+               throw new UnreachableStatementException();
             }
+            finally
+            {
+               waitingForMessage = false;
+            }
          }
       }
-
+      finally
+      {
+         synchronized (stateLock)
+         {
+            receiving = false;
+         }
+      }
    }
 
    public Message receiveNoWait() throws JMSException
@@ -480,54 +471,49 @@
             log.trace("receiveNoWait() " + this);
       }
 
-      //see if we have any undelivered messages before we go to the JMS
-      //server to look.
-      synchronized (messages)
+      try
       {
-         Message mes = getMessage();
-         if (mes != null)
+         //see if we have any undelivered messages before we go to the JMS
+         //server to look.
+         synchronized (messages)
          {
-            synchronized (stateLock)
+            Message mes = getMessage();
+            if (mes != null)
             {
-               receiving = false;
-               
                if (trace)
                   log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this);
+               return mes;
             }
-            return mes;
          }
-      }
-      // Loop through expired messages
-      while (true)
-      {
-         SpyMessage msg = session.connection.receive(subscription, -1);
-         if (msg != null)
+         // Loop through expired messages
+         while (true)
          {
-            Message mes = preProcessMessage(msg);
-            if (mes != null)
+            SpyMessage msg = session.connection.receive(subscription, -1);
+            if (msg != null)
             {
-               synchronized (stateLock)
+               Message mes = preProcessMessage(msg);
+               if (mes != null)
                {
-                  receiving = false;
-                  
                   if (trace)
                      log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this);
+                  return mes;
                }
-               return mes;
             }
-         }
-         else
-         {
-            synchronized (stateLock)
+            else
             {
-               receiving = false;
+               if (trace)
+                  log.trace("receiveNoWait() no message " + this);
+               return null;
             }
-            
-            if (trace)
-               log.trace("receiveNoWait() no message " + this);
-            return null;
          }
       }
+      finally
+      {
+         synchronized (stateLock)
+         {
+            receiving = false;
+         }
+      }
    }
 
    public void close() throws JMSException

Modified: trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java	2006-11-15 16:14:37 UTC (rev 58403)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java	2006-11-15 16:52:59 UTC (rev 58404)
@@ -1,35 +1,37 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.test.jbossmq;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.jboss.mq.SpyDestination;
+import org.jboss.mq.SpyQueue;
+import org.jboss.mq.server.JMSDestinationManager;
+import org.jboss.mq.server.JMSQueue;
+import org.jboss.test.AbstractTestDelegate;
+import org.jboss.test.jbossmq.support.MockServerFailureInterceptor;
+import org.jboss.test.kernel.junit.MicrocontainerTest;
 
-import org.jboss.mq.SpyQueue;
-import org.jboss.mq.server.JMSDestinationManager;
-import org.jboss.mq.server.JMSQueue;
-import org.jboss.test.AbstractTestDelegate;
-import org.jboss.test.kernel.junit.MicrocontainerTest;
-
 /**
  * JBossMQMicrocontainerTest.
  * 
@@ -83,4 +85,20 @@
       server.addDestination(realQueue);
       return queue;
    }
+   
+   protected void removeDestination(SpyDestination destination) throws Exception
+   {
+      JMSDestinationManager server = getJMSServer();
+      server.closeDestination(destination);
+   }
+   
+   protected MockServerFailureInterceptor getMockServerFailure() throws Exception
+   {
+      return (MockServerFailureInterceptor) getBean("MockServerFailure");
+   }
+   
+   protected void raiseReceiveError(boolean value) throws Exception
+   {
+      getMockServerFailure().raiseReceiveError = value; 
+   }
 }




More information about the jboss-cvs-commits mailing list