[jboss-cvs] JBoss Messaging SVN: r4547 - in trunk: src/main/org/jboss/messaging/jms/client and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 23 05:45:38 EDT 2008
Author: timfox
Date: 2008-06-23 05:45:38 -0400 (Mon, 23 Jun 2008)
New Revision: 4547
Removed:
trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
Log:
Removed ASF stuff
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java 2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java 2008-06-23 09:45:38 UTC (rev 4547)
@@ -52,50 +52,50 @@
// Attributes -----------------------------------------------------------------------------------
- private Map<Long /* remoting session ID */, List<ServerConnection>> endpoints;
+ private Map<Long /* remoting session ID */, List<ServerConnection>> connections;
// Constructors ---------------------------------------------------------------------------------
public ConnectionManagerImpl()
{
- endpoints = new HashMap<Long, List<ServerConnection>>();
+ connections = new HashMap<Long, List<ServerConnection>>();
}
// ConnectionManager implementation -------------------------------------------------------------
- public synchronized void registerConnection(long remotingClientSessionID,
- ServerConnection endpoint)
+ public synchronized void registerConnection(final long remotingClientSessionID,
+ final ServerConnection connection)
{
- List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
+ List<ServerConnection> connectionEndpoints = connections.get(remotingClientSessionID);
if (connectionEndpoints == null)
{
connectionEndpoints = new ArrayList<ServerConnection>();
- endpoints.put(remotingClientSessionID, connectionEndpoints);
+ connections.put(remotingClientSessionID, connectionEndpoints);
}
- connectionEndpoints.add(endpoint);
+ connectionEndpoints.add(connection);
- log.debug("registered connection " + endpoint + " as " + remotingClientSessionID);
+ log.debug("registered connection " + connection + " as " + remotingClientSessionID);
}
- public synchronized ServerConnection unregisterConnection(long remotingClientSessionID,
- ServerConnection endpoint)
+ public synchronized ServerConnection unregisterConnection(final long remotingClientSessionID,
+ final ServerConnection connection)
{
- List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
+ List<ServerConnection> connectionEndpoints = connections.get(remotingClientSessionID);
if (connectionEndpoints != null)
{
- connectionEndpoints.remove(endpoint);
+ connectionEndpoints.remove(connection);
- log.debug("unregistered connection " + endpoint + " with remoting session ID " + remotingClientSessionID);
+ log.debug("unregistered connection " + connection + " with remoting session ID " + remotingClientSessionID);
if (connectionEndpoints.isEmpty())
{
- endpoints.remove(remotingClientSessionID);
+ connections.remove(remotingClientSessionID);
}
- return endpoint;
+ return connection;
}
return null;
}
@@ -104,9 +104,9 @@
{
// I will make a copy to avoid ConcurrentModification
List<ServerConnection> list = new ArrayList<ServerConnection>();
- for (List<ServerConnection> connections : endpoints.values())
+ for (List<ServerConnection> conns : connections.values())
{
- list.addAll(connections);
+ list.addAll(conns);
}
return list;
}
@@ -114,9 +114,9 @@
public synchronized int size()
{
int size = 0;
- for (List<ServerConnection> connections : endpoints.values())
+ for (List<ServerConnection> conns : connections.values())
{
- size += connections.size();
+ size += conns.size();
}
return size;
}
@@ -129,7 +129,7 @@
{
log.warn(me.getMessage(), me);
}
- closeConsumers(sessionID);
+ closeConnections(sessionID);
}
// Public ---------------------------------------------------------------------------------------
@@ -145,26 +145,32 @@
// Private --------------------------------------------------------------------------------------
- private synchronized void closeConsumers(long remotingClientSessionID)
+ private synchronized void closeConnections(final long remotingClientSessionID)
{
- List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
+ List<ServerConnection> conns = connections.get(remotingClientSessionID);
- if (connectionEndpoints == null || connectionEndpoints.isEmpty())
+ if (conns == null || conns.isEmpty())
+ {
return;
+ }
// we still have connections open for the session
- log.warn("A problem has been detected with the connection to remote client " +
+ log.warn("A problem has been detected with the connection from client " +
remotingClientSessionID + ". It is possible the client has exited without closing " +
"its connection(s) or the network has failed. All connection resources " +
"corresponding to that client process will now be removed.");
- // the connection endpoints are copied in a new list to avoid concurrent modification exception
+ // the connection connections are copied in a new list to avoid concurrent modification exception
List<ServerConnection> copy;
- if (connectionEndpoints != null)
- copy = new ArrayList<ServerConnection>(connectionEndpoints);
+ if (conns != null)
+ {
+ copy = new ArrayList<ServerConnection>(conns);
+ }
else
+ {
copy = new ArrayList<ServerConnection>();
+ }
for (ServerConnection sce : copy)
{
@@ -179,35 +185,5 @@
log.error("Failed to close connection", e);
}
}
-
- //dump();
- }
-
-// private void dump()
-// {
-// if (log.isDebugEnabled())
-// {
-// StringBuffer buff = new StringBuffer("*********** Dumping connections\n");
-// buff.append("remoting session ID -----> server connection endpoints:\n");
-// if (endpoints.size() == 0)
-// {
-// buff.append(" No registered endpoints\n");
-// }
-// for (Entry<Long, List<ServerConnection>> entry : endpoints.entrySet())
-// {
-// List<ServerConnection> connectionEndpoints = entry.getValue();
-// buff.append(" " + entry.getKey() + "----->\n");
-// for (ServerConnection sce : connectionEndpoints)
-// {
-// buff.append(" " + sce + " (" + System.identityHashCode(sce) + ") " + sce.getClientAddress() + "\n");
-// }
-// }
-// buff.append("*** Dumped connections");
-//
-// log.debug(buff);
-// }
-// }
-
- // Inner classes --------------------------------------------------------------------------------
-
+ }
}
Deleted: trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java 2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java 2008-06-23 09:45:38 UTC (rev 4547)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.jms.client;
-
-import org.jboss.messaging.core.client.ClientSession;
-
-/**
- *
- * A AsfMessageHolder
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class AsfMessageHolder
-{
- private final JBossMessage msg;
-
- private final String consumerID;
-
- private final String queueName;
-
- private final int maxDeliveries;
-
- private final ClientSession connectionConsumerSession;
-
- public AsfMessageHolder(final JBossMessage msg, final String consumerID,
- final String queueName, final int maxDeliveries,
- final ClientSession connectionConsumerSession)
- {
- this.msg = msg;
-
- this.consumerID = consumerID;
-
- this.queueName = queueName;
-
- this.maxDeliveries = maxDeliveries;
-
- this.connectionConsumerSession = connectionConsumerSession;
- }
-
- public JBossMessage getMsg()
- {
- return msg;
- }
-
- public String getConsumerID()
- {
- return consumerID;
- }
-
- public String getQueueName()
- {
- return queueName;
- }
-
- public int getMaxDeliveries()
- {
- return maxDeliveries;
- }
-
- public ClientSession getConnectionConsumerSession()
- {
- return connectionConsumerSession;
- }
-}
Deleted: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java 2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java 2008-06-23 09:45:38 UTC (rev 4547)
@@ -1,336 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.jms.client;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.JMSException;
-import javax.jms.ServerSessionPool;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.JBossDestination;
-
-/**
- * This class implements javax.jms.ConnectionConsumer
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- *
- * Partially based on JBossMQ version by:
- *
- * @author Hiram Chirino (Cojonudo14 at hotmail.com)
- * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- *
- * @version $Revision$
- *
- * $Id$
- */
-public class JBossConnectionConsumer implements ConnectionConsumer, Runnable
-{
- // Constants -----------------------------------------------------
-
- private static Logger log = Logger.getLogger(JBossConnectionConsumer.class);
-
- private static boolean trace = log.isTraceEnabled();
-
- private static final int TIMEOUT = 20000;
-
- // Attributes ----------------------------------------------------
-
- private org.jboss.messaging.core.client.ClientConsumer cons;
-
- private org.jboss.messaging.core.client.ClientSession sess;
-
- private String consumerID;
-
- /** The ServerSessionPool that is implemented by the AS */
- private ServerSessionPool serverSessionPool;
-
- /** The maximum number of messages that a single session will be loaded with. */
- private int maxMessages;
-
- /** Is the ConnectionConsumer closed? */
- private volatile boolean closed;
-
- /** The "listening" thread that gets messages from destination and queues
- them for delivery to sessions */
- private Thread internalThread;
-
- /** The thread id */
- private int id;
-
- /** The thread id generator */
- private static AtomicInteger threadId = new AtomicInteger(0);
-
- private int maxDeliveries;
-
- private String queueName;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public JBossConnectionConsumer(org.jboss.messaging.core.client.ClientConnection conn, JBossDestination dest,
- String subName, String messageSelector,
- ServerSessionPool sessPool, int maxMessages) throws JMSException
- {
-// this.serverSessionPool = sessPool;
-// this.maxMessages = maxMessages;
-//
-// if (this.maxMessages < 1)
-// {
-// this.maxMessages = 1;
-// }
-//
-// // Create a consumer. The ClientConsumer knows we are a connection consumer so will
-// // not call pre or postDeliver so messages won't be acked, or stored in session/tx.
-// sess = conn.createClientSession(false, Session.CLIENT_ACKNOWLEDGE, false);
-//
-// //cons = sess.createClientConsumer(dest.toCoreDestination(), messageSelector, false, subName);
-//
-// this.consumerID = cons.getID();
-//
-// //this.maxDeliveries = cons.getMaxDeliveries();
-//
-// if (subName != null)
-// {
-// queueName = MessageQueueNameHelper.createSubscriptionName(conn.getClientID(), subName);
-// }
-// else
-// {
-// queueName = dest.getName();
-// }
-//
-// id = threadId.increment();
-// internalThread = new Thread(this, "Connection ClientConsumer for dest " + dest + " id=" + id);
-// internalThread.start();
-//
-// if (trace) { log.trace(this + " created"); }
- }
-
- // ConnectionConsumer implementation -----------------------------
-
- public ServerSessionPool getServerSessionPool() throws JMSException
- {
- return serverSessionPool;
- }
-
- public void close() throws JMSException
- {
- if (trace) { log.trace("close " + this); }
-
- doClose();
-
- //Wait for internal thread to complete
- if (trace) { log.trace(this + " Waiting for internal thread to complete"); }
-
- try
- {
- internalThread.join(TIMEOUT);
-
- if (internalThread.isAlive())
- {
- throw new JMSException(this + " Waited " + TIMEOUT + " ms for internal thread to complete, but it didn't");
- }
- }
- catch (InterruptedException e)
- {
- if (trace) { log.trace(this + " Thread interrupted while waiting for internal thread to complete"); }
- //Ignore
- }
-
- if (trace) { log.trace("Closed: " + this); }
- }
-
- // Runnable implementation ---------------------------------------
-
- public void run()
- {
- //TODO - need to work out how to get ASF to work with core
-
-// if (trace) { log.trace("running connection consumer"); }
-// try
-// {
-// List mesList = new ArrayList();
-//
-// while (true)
-// {
-// if (closed)
-// {
-// if (trace) { log.trace("Connection consumer is closed, breaking"); }
-// break;
-// }
-//
-// if (mesList.isEmpty())
-// {
-// // Remove up to maxMessages messages from the consumer
-// for (int i = 0; i < maxMessages; i++)
-// {
-// // receiveNoWait
-//
-// if (trace) { log.trace(this + " attempting to get message with receiveNoWait"); }
-//
-// Message m = null;
-//
-// try
-// {
-// m = cons.receive(-1);
-// }
-// catch (JMSException e)
-// {
-// //If the consumer is closed, we will get a JMSException so we ignore
-// if (!closed)
-// {
-// throw e;
-// }
-// }
-//
-// if (m == null)
-// {
-// if (trace) { log.trace("receiveNoWait did not retrieve any message"); }
-// break;
-// }
-//
-// if (trace) { log.trace("receiveNoWait got message " + m + " adding to queue"); }
-// mesList.add(m);
-// }
-//
-// if (mesList.isEmpty())
-// {
-// // We didn't get any messages doing receiveNoWait, so let's wait. This returns if
-// // a message is received or by the consumer closing.
-//
-// if (trace) { log.trace(this + " attempting to get message with blocking receive (no timeout)"); }
-//
-// Message m = null;
-//
-// try
-// {
-// m = cons.receive(0);
-// }
-// catch (JMSException e)
-// {
-// //If the consumer is closed, we will get a JMSException so we ignore
-// if (!closed)
-// {
-// throw e;
-// }
-// }
-//
-// if (m != null)
-// {
-// if (trace) { log.trace("receive (no timeout) got message " + m + " adding to queue"); }
-// mesList.add(m);
-// }
-// else
-// {
-// // The consumer must have closed
-// if (trace) { log.trace("blocking receive returned null, consumer must have closed"); }
-// break;
-// }
-// }
-// }
-//
-// if (!mesList.isEmpty())
-// {
-// if (trace) { log.trace("there are " + mesList.size() + " messages to send to session"); }
-//
-// ServerSession serverSession = serverSessionPool.getServerSession();
-// JBossSession session = (JBossSession)serverSession.getSession();
-//
-// MessageListener listener = session.getMessageListener();
-//
-// if (listener == null)
-// {
-// // Sanity check
-// if (trace) { log.trace(this + ": session " + session + " did not have a set MessageListener"); }
-// }
-//
-// for (int i = 0; i < mesList.size(); i++)
-// {
-// JBossMessage m = (JBossMessage)mesList.get(i);
-// session.addAsfMessage(m, consumerID, queueName, maxDeliveries, sess);
-// if (trace) { log.trace("added " + m + " to session"); }
-// }
-//
-// if (trace) { log.trace(this + " starting serverSession " + serverSession); }
-//
-// serverSession.start();
-//
-// if (trace) { log.trace(this + "'s serverSession processed messages"); }
-//
-// mesList.clear();
-// }
-// }
-// if (trace) { log.trace("ConnectionConsumer run() exiting"); }
-// }
-// catch (Throwable t)
-// {
-// log.debug("Connection consumer closing due to error in listening thread " + this, t);
-//
-// try
-// {
-// //Closing
-// doClose();
-// }
-// catch (JMSException e)
-// {
-// log.error("Failed to close connection consumer", e);
-// }
-// }
- }
-
- protected synchronized void doClose() throws JMSException
- {
-// if (closed)
-// {
-// return;
-// }
-//
-// closed = true;
-//
-// sess.closing();
-// sess.close();
-
- if (trace) { log.trace(this + "Closed message handler"); }
- }
-
- // Public --------------------------------------------------------
-
- public String toString()
- {
- return "JBossConnectionConsumer[" + consumerID + ", " + id + "]";
- }
-
- // Object overrides ----------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-06-23 09:45:38 UTC (rev 4547)
@@ -22,6 +22,41 @@
package org.jboss.messaging.jms.client;
+import java.io.Serializable;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.transaction.xa.XAResource;
+
import org.jboss.messaging.core.client.ClientBrowser;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientProducer;
@@ -30,17 +65,18 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.jms.*;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.JBossTemporaryQueue;
+import org.jboss.messaging.jms.JBossTemporaryTopic;
+import org.jboss.messaging.jms.JBossTopic;
import org.jboss.messaging.util.SimpleString;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.transaction.xa.XAResource;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.UUID;
-
/**
+ *
+ * Note that we *do not* support JMS ASF (Application Server Facilities) optional
+ * constructs such as ConnectionConsumer
+ *
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
@@ -76,11 +112,7 @@
private final boolean transacted;
private final boolean xa;
-
- private LinkedList<AsfMessageHolder> asfMessages;
-
- private MessageListener distinguishedListener;
-
+
private boolean recoverCalled;
// Constructors --------------------------------------------------
@@ -257,44 +289,16 @@
{
checkClosed();
- return distinguishedListener;
+ return null;
}
public void setMessageListener(final MessageListener listener) throws JMSException
{
- checkClosed();
-
- this.distinguishedListener = listener;
+ checkClosed();
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
public void run()
{
-// try
-// {
-// if (asfMessages != null)
-// {
-// while (asfMessages.size() > 0)
-// {
-// AsfMessageHolder holder = (AsfMessageHolder)asfMessages.removeFirst();
-//
-// session.preDeliver(holder.getMsg().getDeliveryId());
-//
-// session.postDeliver();
-//
-// distinguishedListener.onMessage(holder.getMsg());
-// }
-// }
-// }
-// catch (Exception e)
-// {
-// log.error("Failed to process ASF messages", e);
-// }
-
- //Need to work out how to get ASF to work with core
}
public MessageProducer createProducer(final Destination destination) throws JMSException
@@ -825,27 +829,7 @@
}
// Package protected ---------------------------------------------
-
- /*
- * This method is used by the JBossConnectionConsumer to load up the session
- * with messages to be processed by the session's run() method
- */
- void addAsfMessage(final JBossMessage m, final String consumerID, final String queueName, final int maxDeliveries,
- final ClientSession connectionConsumerSession) throws JMSException
- {
-
- AsfMessageHolder holder =
- new AsfMessageHolder(m, consumerID, queueName, maxDeliveries,
- connectionConsumerSession);
-
- if (asfMessages == null)
- {
- asfMessages = new LinkedList<AsfMessageHolder>();
- }
-
- asfMessages.add(holder);
- }
-
+
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
Deleted: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-06-23 09:45:38 UTC (rev 4547)
@@ -1,594 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ServerSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.messaging.jms.client.JBossConnectionConsumer;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-
-/**
- * ConnectionConsumer tests
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ConnectionConsumerTest extends JMSTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ConnectionConsumerTest(String name)
- {
- super(name);
- }
-
- // TestCase overrides -------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testSimple() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
- final int NUM_MESSAGES = 100;
-
- Connection connConsumer = null;
-
- Connection connProducer = null;
-
- try
- {
- connConsumer = cf.createConnection();
-
- connConsumer.start();
-
- Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- SimpleMessageListener listener = new SimpleMessageListener(NUM_MESSAGES);
-
- sessCons.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
- JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
- connProducer = cf.createConnection();
-
- Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProd.createProducer(queue1);
-
- forceGC();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage m = sessProd.createTextMessage("testing testing");
- prod.send(m);
- }
-
- //Wait for messages
-
- listener.waitForLatch(10000);
-
- if (listener.getMsgsReceived() != NUM_MESSAGES)
- {
- fail("Didn't receive all messages");
- }
-
- if (listener.failed)
- {
- fail ("Didn't receive correct messages");
- }
-
- cc.close();
-
- connProducer.close();
- connProducer = null;
- connConsumer.close();
- connConsumer = null;
- }
- finally
- {
- if (connConsumer != null) connConsumer.close();
- if (connProducer != null) connProducer.close();
- }
- }
-
-
-
- public void testRedeliveryTransacted() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
- Connection connConsumer = null;
-
- Connection connProducer = null;
-
- try
- {
- connConsumer = cf.createConnection();
-
- connConsumer.start();
-
- Session sessCons = connConsumer.createSession(true, Session.SESSION_TRANSACTED);
-
- RedelMessageListener listener = new RedelMessageListener(sessCons);
-
- sessCons.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
- JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
- connProducer = cf.createConnection();
-
- Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProd.createProducer(queue1);
-
- TextMessage m1 = sessProd.createTextMessage("a");
- TextMessage m2 = sessProd.createTextMessage("b");
- TextMessage m3 = sessProd.createTextMessage("c");
- prod.send(m1);
- prod.send(m2);
- prod.send(m3);
-
- //Wait for messages
-
- listener.waitForLatch(10000);
-
- if (listener.failed)
- {
- fail ("Didn't receive correct messages");
- }
-
- cc.close();
-
- connProducer.close();
- connProducer = null;
- connConsumer.close();
- connConsumer = null;
- }
- finally
- {
- if (connConsumer != null) connConsumer.close();
- if (connConsumer != null) connProducer.close();
- }
- }
-
-
- public void testRedeliveryTransactedDifferentConnection() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
- Connection connConnectionConsumer = null;
-
- Connection connConsumer = null;
-
- Connection connProducer = null;
-
- try
- {
- connConsumer = cf.createConnection();
-
- connConsumer.start();
-
- Session sessCons = connConsumer.createSession(true, Session.SESSION_TRANSACTED);
-
- RedelMessageListener listener = new RedelMessageListener(sessCons);
-
- sessCons.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
- connConnectionConsumer = cf.createConnection();
-
- connConnectionConsumer.start();
-
- JBossConnectionConsumer cc = (JBossConnectionConsumer)connConnectionConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
- connProducer = cf.createConnection();
-
- Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProd.createProducer(queue1);
-
- TextMessage m1 = sessProd.createTextMessage("a");
- TextMessage m2 = sessProd.createTextMessage("b");
- TextMessage m3 = sessProd.createTextMessage("c");
- prod.send(m1);
- prod.send(m2);
- prod.send(m3);
-
- //Wait for messages
-
- listener.waitForLatch(10000);
-
- if (listener.failed)
- {
- fail ("Didn't receive correct messages");
- }
-
- cc.close();
-
- connProducer.close();
- connProducer = null;
- connConsumer.close();
- connConsumer = null;
- connConnectionConsumer.close();
- connConnectionConsumer = null;
- }
- finally
- {
- if (connConsumer != null) connConsumer.close();
- if (connConsumer != null) connProducer.close();
- if (connConnectionConsumer != null) connConnectionConsumer.close();
- }
- }
-
- public void testCloseWhileProcessing() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
- final int NUM_MESSAGES = 100;
-
- Connection connConsumer = null;
-
- Connection connProducer = null;
-
- try
- {
- connConsumer = cf.createConnection();
-
- connConsumer.start();
-
- Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- SimpleMessageListener listener = new SimpleMessageListener(NUM_MESSAGES);
-
- sessCons.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
- JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
- connProducer = cf.createConnection();
-
- Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessProd.createProducer(queue1);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage m = sessProd.createTextMessage("testing testing");
- prod.send(m);
- }
-
- cc.close();
-
- connProducer.close();
- connProducer = null;
- connConsumer.close();
- connConsumer = null;
- }
- finally
- {
- if (connConsumer != null) connConsumer.close();
- if (connConsumer != null) connProducer.close();
-
- removeAllMessages(queue1.getQueueName(), true, 0);
- }
- }
-
- public void testStopWhileProcessing() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
-
- Connection connConsumer = null;
-
- try
- {
- connConsumer = cf.createConnection();
-
- connConsumer.start();
-
- Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- SimpleMessageListener listener = new SimpleMessageListener(0);
-
- sessCons.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
- JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
- stopServerPeer();
- connConsumer.close();
- startServerPeer();
- deployAndLookupAdministeredObjects();
- connConsumer = null;
- }
- finally
- {
- if (connConsumer != null) connConsumer.close();
- }
- }
-
-
- class SimpleMessageListener implements MessageListener
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- boolean failed;
-
- int msgsReceived;
-
- int numExpectedMsgs;
-
- SimpleMessageListener(int numExpectedMsgs)
- {
- this.numExpectedMsgs = numExpectedMsgs;
-
- }
-
- synchronized void incMsgsReceived()
- {
- msgsReceived++;
- if (msgsReceived == numExpectedMsgs)
- {
- latch.countDown();
- }
-
- }
-
- synchronized int getMsgsReceived()
- {
- return msgsReceived;
- }
-
-
- void waitForLatch(long timeout) throws Exception
- {
- latch.await(timeout, MILLISECONDS);
- //Thread.sleep(2000); //Enough time for postDeliver to complete
- }
-
- public synchronized void onMessage(Message message)
- {
- try
- {
- TextMessage tm = (TextMessage)message;
-
- if (!tm.getText().equals("testing testing"))
- {
- failed = true;
- }
-
- incMsgsReceived();
-
- }
- catch (Exception e)
- {
- log.error(e);
- failed = true;
- }
- }
- }
-
- class RedelMessageListener implements MessageListener
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- boolean failed;
-
- int count;
-
- Session sess;
-
- RedelMessageListener(Session sess)
- {
- this.sess = sess;
- }
-
- void waitForLatch(long timeout) throws Exception
- {
- latch.await(timeout, MILLISECONDS);
- }
-
- public synchronized void onMessage(Message message)
- {
- try
- {
- count++;
-
- TextMessage tm = (TextMessage)message;
-
- log.trace("Got message " + tm.getText() + " count=" + count);
-
- if (count == 1)
- {
- log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-
- if (!tm.getText().equals("a"))
- {
- log.trace("Expected a but was " + tm.getText());
- failed = true;
- latch.countDown();
- }
- }
- if (count == 2)
- {
- log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-
- if (!tm.getText().equals("b"))
- {
- log.trace("Expected b but was " + tm.getText());
- failed = true;
- latch.countDown();
- }
- }
- if (count == 3)
- {
- log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-
- if (!tm.getText().equals("c"))
- {
- log.trace("Expected c but was " + tm.getText());
- failed = true;
- latch.countDown();
- }
- else
- {
- if (sess.getAcknowledgeMode() == Session.SESSION_TRANSACTED)
- {
- log.trace("Rolling back");
- sess.rollback();
- }
- }
- }
- if (count == 4)
- {
- log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-
- if (!tm.getText().equals("a"))
- {
- log.trace("Expected a but was " + tm.getText());
- failed = true;
- latch.countDown();
- }
- if (!tm.getJMSRedelivered())
- {
-
- failed = true;
- latch.countDown();
- }
- }
- if (count == 5)
- {
- log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-
- if (!tm.getText().equals("b"))
- {
- log.trace("Expected b but was " + tm.getText());
- failed = true;
- latch.countDown();
- }
- if (!tm.getJMSRedelivered())
- {
- log.trace("Redelivered flag not set");
- failed = true;
- latch.countDown();
- }
- }
- if (count == 6)
- {
- log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-
- if (!tm.getText().equals("c"))
- {
- log.trace("Expected c but was " + tm.getText());
- failed = true;
- latch.countDown();
- }
- if (!tm.getJMSRedelivered())
- {
- log.trace("Redelivered flag not set");
- failed = true;
- latch.countDown();
- }
- else
- {
- if (sess.getAcknowledgeMode() == Session.SESSION_TRANSACTED)
- {
- log.trace("Committing");
- sess.commit();
- }
- latch.countDown();
- }
- }
-
- }
- catch (JMSException e)
- {
- log.error(e);
- failed = true;
- }
- }
- }
-
-
- class MockServerSessionPool implements ServerSessionPool
- {
- private ServerSession serverSession;
-
- MockServerSessionPool(Session sess)
- {
- serverSession = new MockServerSession(sess);
- }
-
- public ServerSession getServerSession() throws JMSException
- {
- return serverSession;
- }
- }
-
- class MockServerSession implements ServerSession
- {
- Session session;
-
- MockServerSession(Session sess)
- {
- this.session = sess;
- }
-
-
- public Session getSession() throws JMSException
- {
- return session;
- }
-
- public void start() throws JMSException
- {
- session.run();
- }
-
- }
-
-}
More information about the jboss-cvs-commits
mailing list