[jboss-cvs] JBossAS SVN: r58404 - in trunk: jbossmq/src/main/org/jboss/mq testsuite/src/main/org/jboss/test/jbossmq
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 15 11:53:06 EST 2006
Author: adrian at jboss.org
Date: 2006-11-15 11:52:59 -0500 (Wed, 15 Nov 2006)
New Revision: 58404
Modified:
trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java
Log:
[JBAS-3821] - Error from server during receive corrupts internal receiving state.
Modified: trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
===================================================================
--- trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java 2006-11-15 16:14:37 UTC (rev 58403)
+++ trunk/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java 2006-11-15 16:52:59 UTC (rev 58404)
@@ -1,24 +1,24 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.mq;
import java.util.LinkedList;
@@ -268,84 +268,80 @@
log.trace("receive() " + this);
}
- synchronized (messages)
+ try
{
- //see if we have any undelivered messages before we go to the JMS
- //server to look.
- Message message = getMessage();
- if (message != null)
+ synchronized (messages)
{
- synchronized (stateLock)
+ //see if we have any undelivered messages before we go to the JMS
+ //server to look.
+ Message message = getMessage();
+ if (message != null)
{
- receiving = false;
-
if (trace)
log.trace("receive() message in list " + message.getJMSMessageID() + " " + this);
+ return message;
}
- return message;
- }
-
- // Loop through expired messages
- while (true)
- {
- SpyMessage msg = session.connection.receive(subscription, 0);
- if (msg != null)
+
+ // Loop through expired messages
+ while (true)
{
- Message mes = preProcessMessage(msg);
- if (mes != null)
+ SpyMessage msg = session.connection.receive(subscription, 0);
+ if (msg != null)
{
- synchronized (stateLock)
+ Message mes = preProcessMessage(msg);
+ if (mes != null)
{
- receiving = false;
-
if (trace)
log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this);
+ return mes;
}
- return mes;
}
+ else
+ break;
}
- else
- break;
- }
- if (trace)
- log.trace("No message in receive(), waiting " + this);
-
- try
- {
- waitingForMessage = true;
- while (true)
+ if (trace)
+ log.trace("No message in receive(), waiting " + this);
+
+ try
{
- if (isClosed())
+ waitingForMessage = true;
+ while (true)
{
- if (trace)
- log.trace("Consumer closed in receive() " + this);
- return null;
+ if (isClosed())
+ {
+ if (trace)
+ log.trace("Consumer closed in receive() " + this);
+ return null;
+ }
+ Message mes = getMessage();
+ if (mes != null)
+ {
+ if (trace)
+ log.trace("receive() message from list after wait " + this);
+ return mes;
+ }
+ messages.wait();
}
- Message mes = getMessage();
- if (mes != null)
- {
- if (trace)
- log.trace("receive() message from list after wait " + this);
- return mes;
- }
- messages.wait();
}
- }
- catch (Throwable t)
- {
- SpyJMSException.rethrowAsJMSException("Receive interupted", t);
- throw new UnreachableStatementException();
- }
- finally
- {
- waitingForMessage = false;
- synchronized (stateLock)
+ catch (Throwable t)
{
- receiving = false;
+ SpyJMSException.rethrowAsJMSException("Receive interupted", t);
+ throw new UnreachableStatementException();
}
+ finally
+ {
+ waitingForMessage = false;
+ }
}
}
+ finally
+ {
+ synchronized (stateLock)
+ {
+ receiving = false;
+ }
+ }
}
public Message receive(long timeOut) throws JMSException
@@ -375,94 +371,89 @@
if (trace)
log.trace("receive(long) endTime=" + endTime + " " + this);
- synchronized (messages)
+ try
{
- //see if we have any undelivered messages before we go to the JMS
- //server to look.
- Message message = getMessage();
- if (message != null)
+ synchronized (messages)
{
- synchronized (stateLock)
+ //see if we have any undelivered messages before we go to the JMS
+ //server to look.
+ Message message = getMessage();
+ if (message != null)
{
- receiving = false;
-
if (trace)
log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this);
+ return message;
}
- return message;
- }
- // Loop through expired messages
- while (true)
- {
- SpyMessage msg = session.connection.receive(subscription, timeOut);
- if (msg != null)
+ // Loop through expired messages
+ while (true)
{
- Message mes = preProcessMessage(msg);
- if (mes != null)
+ SpyMessage msg = session.connection.receive(subscription, timeOut);
+ if (msg != null)
{
- synchronized (stateLock)
+ Message mes = preProcessMessage(msg);
+ if (mes != null)
{
- receiving = false;
-
if (trace)
log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this);
+ return mes;
}
- return mes;
}
+ else
+ break;
}
- else
- break;
- }
- if (trace)
- log.trace("No message in receive(), waiting " + this);
-
- try
- {
- waitingForMessage = true;
- while (true)
+ if (trace)
+ log.trace("No message in receive(), waiting " + this);
+
+ try
{
- if (isClosed())
+ waitingForMessage = true;
+ while (true)
{
- if (trace)
- log.trace("Consumer closed in receive(long) " + this);
- return null;
- }
+ if (isClosed())
+ {
+ if (trace)
+ log.trace("Consumer closed in receive(long) " + this);
+ return null;
+ }
- Message mes = getMessage();
- if (mes != null)
- {
- if (trace)
- log.trace("receive(long) message from list after wait " + this);
- return mes;
- }
+ Message mes = getMessage();
+ if (mes != null)
+ {
+ if (trace)
+ log.trace("receive(long) message from list after wait " + this);
+ return mes;
+ }
- long att = endTime - System.currentTimeMillis();
- if (att <= 0)
- {
- if (trace)
- log.trace("receive(long) timed out endTime=" + endTime + " " + this);
- return null;
- }
+ long att = endTime - System.currentTimeMillis();
+ if (att <= 0)
+ {
+ if (trace)
+ log.trace("receive(long) timed out endTime=" + endTime + " " + this);
+ return null;
+ }
- messages.wait(att);
+ messages.wait(att);
+ }
}
- }
- catch (Throwable t)
- {
- SpyJMSException.rethrowAsJMSException("Receive interupted", t);
- throw new UnreachableStatementException();
- }
- finally
- {
- waitingForMessage = false;
- synchronized (stateLock)
+ catch (Throwable t)
{
- receiving = false;
+ SpyJMSException.rethrowAsJMSException("Receive interupted", t);
+ throw new UnreachableStatementException();
}
+ finally
+ {
+ waitingForMessage = false;
+ }
}
}
-
+ finally
+ {
+ synchronized (stateLock)
+ {
+ receiving = false;
+ }
+ }
}
public Message receiveNoWait() throws JMSException
@@ -480,54 +471,49 @@
log.trace("receiveNoWait() " + this);
}
- //see if we have any undelivered messages before we go to the JMS
- //server to look.
- synchronized (messages)
+ try
{
- Message mes = getMessage();
- if (mes != null)
+ //see if we have any undelivered messages before we go to the JMS
+ //server to look.
+ synchronized (messages)
{
- synchronized (stateLock)
+ Message mes = getMessage();
+ if (mes != null)
{
- receiving = false;
-
if (trace)
log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this);
+ return mes;
}
- return mes;
}
- }
- // Loop through expired messages
- while (true)
- {
- SpyMessage msg = session.connection.receive(subscription, -1);
- if (msg != null)
+ // Loop through expired messages
+ while (true)
{
- Message mes = preProcessMessage(msg);
- if (mes != null)
+ SpyMessage msg = session.connection.receive(subscription, -1);
+ if (msg != null)
{
- synchronized (stateLock)
+ Message mes = preProcessMessage(msg);
+ if (mes != null)
{
- receiving = false;
-
if (trace)
log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this);
+ return mes;
}
- return mes;
}
- }
- else
- {
- synchronized (stateLock)
+ else
{
- receiving = false;
+ if (trace)
+ log.trace("receiveNoWait() no message " + this);
+ return null;
}
-
- if (trace)
- log.trace("receiveNoWait() no message " + this);
- return null;
}
}
+ finally
+ {
+ synchronized (stateLock)
+ {
+ receiving = false;
+ }
+ }
}
public void close() throws JMSException
Modified: trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java 2006-11-15 16:14:37 UTC (rev 58403)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmq/JBossMQMicrocontainerTest.java 2006-11-15 16:52:59 UTC (rev 58404)
@@ -1,35 +1,37 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.test.jbossmq;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.jboss.mq.SpyDestination;
+import org.jboss.mq.SpyQueue;
+import org.jboss.mq.server.JMSDestinationManager;
+import org.jboss.mq.server.JMSQueue;
+import org.jboss.test.AbstractTestDelegate;
+import org.jboss.test.jbossmq.support.MockServerFailureInterceptor;
+import org.jboss.test.kernel.junit.MicrocontainerTest;
-import org.jboss.mq.SpyQueue;
-import org.jboss.mq.server.JMSDestinationManager;
-import org.jboss.mq.server.JMSQueue;
-import org.jboss.test.AbstractTestDelegate;
-import org.jboss.test.kernel.junit.MicrocontainerTest;
-
/**
* JBossMQMicrocontainerTest.
*
@@ -83,4 +85,20 @@
server.addDestination(realQueue);
return queue;
}
+
+ protected void removeDestination(SpyDestination destination) throws Exception
+ {
+ JMSDestinationManager server = getJMSServer();
+ server.closeDestination(destination);
+ }
+
+ protected MockServerFailureInterceptor getMockServerFailure() throws Exception
+ {
+ return (MockServerFailureInterceptor) getBean("MockServerFailure");
+ }
+
+ protected void raiseReceiveError(boolean value) throws Exception
+ {
+ getMockServerFailure().raiseReceiveError = value;
+ }
}
More information about the jboss-cvs-commits
mailing list