[jboss-cvs] JBoss Messaging SVN: r4547 - in trunk: src/main/org/jboss/messaging/jms/client and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 23 05:45:38 EDT 2008


Author: timfox
Date: 2008-06-23 05:45:38 -0400 (Mon, 23 Jun 2008)
New Revision: 4547

Removed:
   trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
Log:
Removed ASF stuff


Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java	2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ConnectionManagerImpl.java	2008-06-23 09:45:38 UTC (rev 4547)
@@ -52,50 +52,50 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private Map<Long /* remoting session ID */, List<ServerConnection>> endpoints;
+   private Map<Long /* remoting session ID */, List<ServerConnection>> connections;
    
    // Constructors ---------------------------------------------------------------------------------
 
    public ConnectionManagerImpl()
    {
-      endpoints = new HashMap<Long, List<ServerConnection>>();
+      connections = new HashMap<Long, List<ServerConnection>>();
    }
 
    // ConnectionManager implementation -------------------------------------------------------------
 
-   public synchronized void registerConnection(long remotingClientSessionID,
-                                               ServerConnection endpoint)
+   public synchronized void registerConnection(final long remotingClientSessionID,
+                                               final ServerConnection connection)
    {    
-      List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
+      List<ServerConnection> connectionEndpoints = connections.get(remotingClientSessionID);
 
       if (connectionEndpoints == null)
       {
          connectionEndpoints = new ArrayList<ServerConnection>();
-         endpoints.put(remotingClientSessionID, connectionEndpoints);
+         connections.put(remotingClientSessionID, connectionEndpoints);
       }
 
-      connectionEndpoints.add(endpoint);
+      connectionEndpoints.add(connection);
 
-      log.debug("registered connection " + endpoint + " as " + remotingClientSessionID);
+      log.debug("registered connection " + connection + " as " + remotingClientSessionID);
    }
    
-   public synchronized ServerConnection unregisterConnection(long remotingClientSessionID,
-         ServerConnection endpoint)
+   public synchronized ServerConnection unregisterConnection(final long remotingClientSessionID,
+                                                             final ServerConnection connection)
    {
-      List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
+      List<ServerConnection> connectionEndpoints = connections.get(remotingClientSessionID);
 
       if (connectionEndpoints != null)
       {
-         connectionEndpoints.remove(endpoint);
+         connectionEndpoints.remove(connection);
 
-         log.debug("unregistered connection " + endpoint + " with remoting session ID " + remotingClientSessionID);
+         log.debug("unregistered connection " + connection + " with remoting session ID " + remotingClientSessionID);
 
          if (connectionEndpoints.isEmpty())
          {
-            endpoints.remove(remotingClientSessionID);           
+            connections.remove(remotingClientSessionID);           
          }
 
-         return endpoint;
+         return connection;
       }
       return null;
    }
@@ -104,9 +104,9 @@
    {
       // I will make a copy to avoid ConcurrentModification
       List<ServerConnection> list = new ArrayList<ServerConnection>();
-      for (List<ServerConnection> connections : endpoints.values())
+      for (List<ServerConnection> conns : connections.values())
       {
-         list.addAll(connections);
+         list.addAll(conns);
       }
       return list;
    }      
@@ -114,9 +114,9 @@
    public synchronized int size()
    {
       int size = 0;
-      for (List<ServerConnection> connections : endpoints.values())
+      for (List<ServerConnection> conns : connections.values())
       {
-         size += connections.size();
+         size += conns.size();
       }
       return size;
    }
@@ -129,7 +129,7 @@
       {
          log.warn(me.getMessage(), me);
       }
-      closeConsumers(sessionID);
+      closeConnections(sessionID);
    }
    
    // Public ---------------------------------------------------------------------------------------
@@ -145,26 +145,32 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private synchronized void closeConsumers(long remotingClientSessionID)
+   private synchronized void closeConnections(final long remotingClientSessionID)
    {
-      List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
+      List<ServerConnection> conns = connections.get(remotingClientSessionID);
       
-      if (connectionEndpoints == null || connectionEndpoints.isEmpty())
+      if (conns == null || conns.isEmpty())
+      {
          return;
+      }
       
       // we still have connections open for the session
       
-      log.warn("A problem has been detected with the connection to remote client " +
+      log.warn("A problem has been detected with the connection from client " +
             remotingClientSessionID + ". It is possible the client has exited without closing " +
             "its connection(s) or the network has failed. All connection resources " +
             "corresponding to that client process will now be removed.");
 
-      // the connection endpoints are copied in a new list to avoid concurrent modification exception
+      // the connection connections are copied in a new list to avoid concurrent modification exception
       List<ServerConnection> copy;
-      if (connectionEndpoints != null)
-         copy = new ArrayList<ServerConnection>(connectionEndpoints);
+      if (conns != null)
+      {
+         copy = new ArrayList<ServerConnection>(conns);
+      }
       else
+      {
          copy = new ArrayList<ServerConnection>();
+      }
          
       for (ServerConnection sce : copy)
       {
@@ -179,35 +185,5 @@
             log.error("Failed to close connection", e);
          }          
       }
-      
-      //dump();
-   }
-   
-//   private void dump()
-//   {
-//      if (log.isDebugEnabled())
-//      {
-//         StringBuffer buff = new StringBuffer("*********** Dumping connections\n");
-//         buff.append("remoting session ID -----> server connection endpoints:\n");
-//         if (endpoints.size() == 0)
-//         {
-//            buff.append("    No registered endpoints\n");
-//         }
-//         for (Entry<Long, List<ServerConnection>> entry : endpoints.entrySet())
-//         {
-//            List<ServerConnection> connectionEndpoints = entry.getValue();
-//            buff.append("    "  + entry.getKey() + "----->\n");
-//            for (ServerConnection sce : connectionEndpoints)
-//            {
-//               buff.append("        " + sce + " (" + System.identityHashCode(sce) + ") " + sce.getClientAddress() + "\n");
-//            }
-//         }
-//         buff.append("*** Dumped connections");
-//         
-//         log.debug(buff);
-//      }
-//   }
-   
-   // Inner classes --------------------------------------------------------------------------------
-
+   }   
 }

Deleted: trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java	2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/jms/client/AsfMessageHolder.java	2008-06-23 09:45:38 UTC (rev 4547)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.jms.client;
-
-import org.jboss.messaging.core.client.ClientSession;
-
-/**
- * 
- * A AsfMessageHolder
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class AsfMessageHolder
-{
-   private final JBossMessage msg;
-   
-   private final String consumerID;
-   
-   private final String queueName;
-   
-   private final int maxDeliveries;
-   
-   private final ClientSession connectionConsumerSession;
-   
-   public AsfMessageHolder(final JBossMessage msg, final String consumerID,
-                           final String queueName, final int maxDeliveries,
-                           final ClientSession connectionConsumerSession)
-   {
-      this.msg = msg;
-      
-      this.consumerID = consumerID;
-      
-      this.queueName = queueName;
-      
-      this.maxDeliveries = maxDeliveries;
-      
-      this.connectionConsumerSession = connectionConsumerSession;
-   }
-
-   public JBossMessage getMsg()
-   {
-      return msg;
-   }
-
-   public String getConsumerID()
-   {
-      return consumerID;
-   }
-
-   public String getQueueName()
-   {
-      return queueName;
-   }
-
-   public int getMaxDeliveries()
-   {
-      return maxDeliveries;
-   }
-
-   public ClientSession getConnectionConsumerSession()
-   {
-      return connectionConsumerSession;
-   }
-}

Deleted: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java	2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionConsumer.java	2008-06-23 09:45:38 UTC (rev 4547)
@@ -1,336 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.jms.client;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.JMSException;
-import javax.jms.ServerSessionPool;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.JBossDestination;
-
-/**
- * This class implements javax.jms.ConnectionConsumer
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * 
- * Partially based on JBossMQ version by:
- * 
- * @author Hiram Chirino (Cojonudo14 at hotmail.com)
- * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- * 
- * @version $Revision$
- *
- * $Id$
- */
-public class JBossConnectionConsumer implements ConnectionConsumer, Runnable
-{
-   // Constants -----------------------------------------------------
-
-   private static Logger log = Logger.getLogger(JBossConnectionConsumer.class);
-
-   private static boolean trace = log.isTraceEnabled();
-   
-   private static final int TIMEOUT = 20000;
-   
-   // Attributes ----------------------------------------------------
-   
-   private org.jboss.messaging.core.client.ClientConsumer cons;
-   
-   private org.jboss.messaging.core.client.ClientSession sess;
-   
-   private String consumerID;
-   
-   /** The ServerSessionPool that is implemented by the AS */
-   private ServerSessionPool serverSessionPool;
-   
-   /** The maximum number of messages that a single session will be loaded with. */
-   private int maxMessages;
-   
-   /** Is the ConnectionConsumer closed? */
-   private volatile boolean closed;
-     
-   /** The "listening" thread that gets messages from destination and queues
-   them for delivery to sessions */
-   private Thread internalThread;
-   
-   /** The thread id */
-   private int id;
-   
-   /** The thread id generator */
-   private static AtomicInteger threadId = new AtomicInteger(0);
-   
-   private int maxDeliveries;
-   
-   private String queueName;
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-
-   public JBossConnectionConsumer(org.jboss.messaging.core.client.ClientConnection conn, JBossDestination dest,
-                                  String subName, String messageSelector,
-                                  ServerSessionPool sessPool, int maxMessages) throws JMSException
-   {
-//      this.serverSessionPool = sessPool;
-//      this.maxMessages = maxMessages;
-//
-//      if (this.maxMessages < 1)
-//      {
-//         this.maxMessages = 1;
-//      }
-//
-//      // Create a consumer. The ClientConsumer knows we are a connection consumer so will
-//      // not call pre or postDeliver so messages won't be acked, or stored in session/tx.
-//      sess = conn.createClientSession(false, Session.CLIENT_ACKNOWLEDGE, false);
-//          
-//      //cons = sess.createClientConsumer(dest.toCoreDestination(), messageSelector, false, subName);
-//
-//      this.consumerID = cons.getID();      
-//        
-//      //this.maxDeliveries = cons.getMaxDeliveries();
-//         
-//      if (subName != null)
-//      {
-//         queueName = MessageQueueNameHelper.createSubscriptionName(conn.getClientID(), subName);
-//      }
-//      else
-//      {
-//         queueName = dest.getName();
-//      }
-//
-//      id = threadId.increment();
-//      internalThread = new Thread(this, "Connection ClientConsumer for dest " + dest + " id=" + id);
-//      internalThread.start();
-//
-//      if (trace) { log.trace(this + " created"); }
-   }
-   
-   // ConnectionConsumer implementation -----------------------------
-
-   public ServerSessionPool getServerSessionPool() throws JMSException
-   {
-      return serverSessionPool;
-   }
-
-   public void close() throws JMSException
-   {
-      if (trace) { log.trace("close " + this); }
-      
-      doClose();
-      
-      //Wait for internal thread to complete
-      if (trace) { log.trace(this + " Waiting for internal thread to complete"); }
-      
-      try
-      {
-         internalThread.join(TIMEOUT);
-         
-         if (internalThread.isAlive())
-         {            
-            throw new JMSException(this + " Waited " + TIMEOUT + " ms for internal thread to complete, but it didn't");
-         }
-      }
-      catch (InterruptedException e)
-      {
-         if (trace) { log.trace(this + " Thread interrupted while waiting for internal thread to complete"); }
-         //Ignore
-      }
-      
-      if (trace) { log.trace("Closed: " + this); }      
-   }
-   
-   // Runnable implementation ---------------------------------------
-    
-   public void run()
-   {
-      //TODO - need to work out how to get ASF to work with core
-      
-//      if (trace) { log.trace("running connection consumer"); }
-//      try
-//      {
-//         List mesList = new ArrayList();
-//         
-//         while (true)
-//         {            
-//            if (closed)
-//            {
-//               if (trace) { log.trace("Connection consumer is closed, breaking"); }
-//               break;
-//            }
-//            
-//            if (mesList.isEmpty())
-//            {
-//               // Remove up to maxMessages messages from the consumer
-//               for (int i = 0; i < maxMessages; i++)
-//               {               
-//                  // receiveNoWait
-//
-//                  if (trace) { log.trace(this + " attempting to get message with receiveNoWait"); }
-//                  
-//                  Message m = null;
-//                  
-//                  try
-//                  {
-//                     m = cons.receive(-1);
-//                  }
-//                  catch (JMSException e)
-//                  {
-//                     //If the consumer is closed, we will get a JMSException so we ignore
-//                     if (!closed)
-//                     {
-//                        throw e;
-//                     }                        
-//                  }
-//               
-//                  if (m == null)
-//                  {
-//                     if (trace) { log.trace("receiveNoWait did not retrieve any message"); }
-//                     break;
-//                  }
-//
-//                  if (trace) { log.trace("receiveNoWait got message " + m + " adding to queue"); }
-//                  mesList.add(m);
-//               }
-//
-//               if (mesList.isEmpty())
-//               {
-//                  // We didn't get any messages doing receiveNoWait, so let's wait. This returns if
-//                  // a message is received or by the consumer closing.
-//
-//                  if (trace) { log.trace(this + " attempting to get message with blocking receive (no timeout)"); }
-//
-//                  Message m = null;
-//                  
-//                  try
-//                  {
-//                     m = cons.receive(0);                  
-//                  }
-//                  catch (JMSException e)
-//                  {
-//                     //If the consumer is closed, we will get a JMSException so we ignore
-//                     if (!closed)
-//                     {
-//                        throw e;
-//                     }                        
-//                  }          
-//                  
-//                  if (m != null)
-//                  {
-//                     if (trace) { log.trace("receive (no timeout) got message " + m + " adding to queue"); }
-//                     mesList.add(m);
-//                  }
-//                  else
-//                  {
-//                     // The consumer must have closed
-//                     if (trace) { log.trace("blocking receive returned null, consumer must have closed"); }
-//                     break;
-//                  }
-//               }
-//            }
-//            
-//            if (!mesList.isEmpty())
-//            {
-//               if (trace) { log.trace("there are " + mesList.size() + " messages to send to session"); }
-//
-//               ServerSession serverSession = serverSessionPool.getServerSession();
-//               JBossSession session = (JBossSession)serverSession.getSession();
-//
-//               MessageListener listener = session.getMessageListener();
-//
-//               if (listener == null)
-//               {
-//                  // Sanity check
-//                  if (trace) { log.trace(this + ": session " + session + " did not have a set MessageListener"); }
-//               }
-//
-//               for (int i = 0; i < mesList.size(); i++)
-//               {
-//                  JBossMessage m = (JBossMessage)mesList.get(i);
-//                  session.addAsfMessage(m, consumerID, queueName, maxDeliveries, sess);
-//                  if (trace) { log.trace("added " + m + " to session"); }
-//               }
-//
-//               if (trace) { log.trace(this + " starting serverSession " + serverSession); }
-//
-//               serverSession.start();
-//
-//               if (trace) { log.trace(this + "'s serverSession processed messages"); }
-//
-//               mesList.clear();
-//            }            
-//         }
-//         if (trace) { log.trace("ConnectionConsumer run() exiting"); }
-//      }
-//      catch (Throwable t)
-//      {
-//         log.debug("Connection consumer closing due to error in listening thread " + this, t);
-//         
-//         try
-//         {
-//            //Closing
-//            doClose();
-//         }
-//         catch (JMSException e)
-//         {
-//            log.error("Failed to close connection consumer", e);
-//         }
-//      }            
-   }
-   
-   protected synchronized void doClose() throws JMSException
-   {
-//      if (closed)
-//      {
-//         return;
-//      }
-//      
-//      closed = true;            
-//      
-//      sess.closing();
-//      sess.close();
-
-      if (trace) { log.trace(this + "Closed message handler"); }
-   }
-
-   // Public --------------------------------------------------------
-
-   public String toString()
-   {
-      return "JBossConnectionConsumer[" + consumerID + ", " + id + "]";
-   }
-
-   // Object overrides ----------------------------------------------
-
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-  
-}

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-06-23 09:45:38 UTC (rev 4547)
@@ -22,6 +22,41 @@
 
 package org.jboss.messaging.jms.client;
 
+import java.io.Serializable;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.transaction.xa.XAResource;
+
 import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -30,17 +65,18 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.jms.*;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.JBossTemporaryQueue;
+import org.jboss.messaging.jms.JBossTemporaryTopic;
+import org.jboss.messaging.jms.JBossTopic;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.transaction.xa.XAResource;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.UUID;
-
 /**
+ * 
+ * Note that we *do not* support JMS ASF (Application Server Facilities) optional
+ * constructs such as ConnectionConsumer
+ * 
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
@@ -76,11 +112,7 @@
    private final boolean transacted;
    
    private final boolean xa;
-   
-   private LinkedList<AsfMessageHolder> asfMessages;
-   
-   private MessageListener distinguishedListener;
-      
+        
    private boolean recoverCalled;
       
    // Constructors --------------------------------------------------
@@ -257,44 +289,16 @@
    {
       checkClosed();
       
-      return distinguishedListener;
+      return null;
    }
 
    public void setMessageListener(final MessageListener listener) throws JMSException
    {
-      checkClosed();
-      
-      this.distinguishedListener = listener;
+      checkClosed();     
    }
    
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
    public void run()
    {
-//      try
-//      {
-//         if (asfMessages != null)
-//         {         
-//            while (asfMessages.size() > 0)
-//            {
-//               AsfMessageHolder holder = (AsfMessageHolder)asfMessages.removeFirst();
-//                    
-//               session.preDeliver(holder.getMsg().getDeliveryId());
-//               
-//               session.postDeliver();
-//               
-//               distinguishedListener.onMessage(holder.getMsg());
-//            }
-//         }
-//      }
-//      catch (Exception e)
-//      {
-//         log.error("Failed to process ASF messages", e);
-//      }
-      
-      //Need to work out how to get ASF to work with core
    }
 
    public MessageProducer createProducer(final Destination destination) throws JMSException
@@ -825,27 +829,7 @@
    }
 
    // Package protected ---------------------------------------------
-   
-   /*
-    * This method is used by the JBossConnectionConsumer to load up the session
-    * with messages to be processed by the session's run() method
-    */
-   void addAsfMessage(final JBossMessage m, final String consumerID, final String queueName, final int maxDeliveries,
-                      final ClientSession connectionConsumerSession) throws JMSException
-   {
-      
-      AsfMessageHolder holder =
-         new AsfMessageHolder(m, consumerID, queueName, maxDeliveries,
-                              connectionConsumerSession);
-
-      if (asfMessages == null)
-      {
-         asfMessages = new LinkedList<AsfMessageHolder>();
-      }
-      
-      asfMessages.add(holder);      
-   }
-      
+     
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------

Deleted: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2008-06-23 09:14:28 UTC (rev 4546)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2008-06-23 09:45:38 UTC (rev 4547)
@@ -1,594 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.test.messaging.jms;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ServerSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.messaging.jms.client.JBossConnectionConsumer;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-
-/**
- * ConnectionConsumer tests
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ConnectionConsumerTest extends JMSTestCase
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ConnectionConsumerTest(String name)
-   {
-      super(name);
-   }
-
-   // TestCase overrides -------------------------------------------   
-
-   // Public --------------------------------------------------------
-
-   public void testSimple() throws Exception
-   {
-      if (ServerManagement.isRemote()) return;
-
-      final int NUM_MESSAGES = 100;
-
-      Connection connConsumer = null;
-
-      Connection connProducer = null;
-
-      try
-      {
-         connConsumer = cf.createConnection();
-
-         connConsumer.start();
-
-         Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         SimpleMessageListener listener = new SimpleMessageListener(NUM_MESSAGES);
-
-         sessCons.setMessageListener(listener);
-
-         ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
-         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
-         connProducer = cf.createConnection();
-
-         Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer prod = sessProd.createProducer(queue1);
-
-         forceGC();
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage m = sessProd.createTextMessage("testing testing");
-            prod.send(m);
-         }
-
-         //Wait for messages
-
-         listener.waitForLatch(10000);
-
-         if (listener.getMsgsReceived() != NUM_MESSAGES)
-         {
-            fail("Didn't receive all messages");
-         }
-
-         if (listener.failed)
-         {
-            fail ("Didn't receive correct messages");
-         }
-
-         cc.close();
-
-         connProducer.close();
-         connProducer = null;
-         connConsumer.close();
-         connConsumer = null;
-      }
-      finally
-      {
-         if (connConsumer != null) connConsumer.close();
-         if (connProducer != null) connProducer.close();
-      }
-   }
-
-
-   
-   public void testRedeliveryTransacted() throws Exception
-   {
-      if (ServerManagement.isRemote()) return;
-      
-      Connection connConsumer = null;
-      
-      Connection connProducer = null;
-      
-      try
-      {
-         connConsumer = cf.createConnection();        
-         
-         connConsumer.start();
-                  
-         Session sessCons = connConsumer.createSession(true, Session.SESSION_TRANSACTED);
-         
-         RedelMessageListener listener = new RedelMessageListener(sessCons);
-         
-         sessCons.setMessageListener(listener);
-         
-         ServerSessionPool pool = new MockServerSessionPool(sessCons);
-         
-         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);         
-         
-         connProducer = cf.createConnection();
-            
-         Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer prod = sessProd.createProducer(queue1);
-            
-         TextMessage m1 = sessProd.createTextMessage("a");
-         TextMessage m2 = sessProd.createTextMessage("b");
-         TextMessage m3 = sessProd.createTextMessage("c");
-         prod.send(m1);
-         prod.send(m2);
-         prod.send(m3);
-            
-         //Wait for messages
-         
-         listener.waitForLatch(10000);                  
-         
-         if (listener.failed)
-         {
-            fail ("Didn't receive correct messages");
-         }
-         
-         cc.close();
-         
-         connProducer.close();
-         connProducer = null;
-         connConsumer.close();
-         connConsumer = null;            
-      }
-      finally 
-      {
-         if (connConsumer != null) connConsumer.close();
-         if (connConsumer != null) connProducer.close();
-      }
-   }
-      
-   
-   public void testRedeliveryTransactedDifferentConnection() throws Exception
-   {
-      if (ServerManagement.isRemote()) return;
-      
-      Connection connConnectionConsumer = null;
-      
-      Connection connConsumer = null;
-      
-      Connection connProducer = null;
-      
-      try
-      {
-         connConsumer = cf.createConnection();        
-         
-         connConsumer.start();
-                  
-         Session sessCons = connConsumer.createSession(true, Session.SESSION_TRANSACTED);
-         
-         RedelMessageListener listener = new RedelMessageListener(sessCons);
-         
-         sessCons.setMessageListener(listener);
-         
-         ServerSessionPool pool = new MockServerSessionPool(sessCons);
-         
-         connConnectionConsumer = cf.createConnection();
-         
-         connConnectionConsumer.start();
-         
-         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConnectionConsumer.createConnectionConsumer(queue1, null, pool, 1);         
-         
-         connProducer = cf.createConnection();
-            
-         Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer prod = sessProd.createProducer(queue1);
-            
-         TextMessage m1 = sessProd.createTextMessage("a");
-         TextMessage m2 = sessProd.createTextMessage("b");
-         TextMessage m3 = sessProd.createTextMessage("c");
-         prod.send(m1);
-         prod.send(m2);
-         prod.send(m3);
-          
-         //Wait for messages
-         
-         listener.waitForLatch(10000);                  
-         
-         if (listener.failed)
-         {
-            fail ("Didn't receive correct messages");
-         }
-         
-         cc.close();
-           
-         connProducer.close();
-         connProducer = null;
-         connConsumer.close();
-         connConsumer = null;
-         connConnectionConsumer.close();
-         connConnectionConsumer = null;    
-      }
-      finally 
-      {
-         if (connConsumer != null) connConsumer.close();
-         if (connConsumer != null) connProducer.close();
-         if (connConnectionConsumer != null) connConnectionConsumer.close();
-      }
-   }
-
-   public void testCloseWhileProcessing() throws Exception
-   {
-      if (ServerManagement.isRemote()) return;
-
-      final int NUM_MESSAGES = 100;
-
-      Connection connConsumer = null;
-
-      Connection connProducer = null;
-
-      try
-      {
-         connConsumer = cf.createConnection();
-
-         connConsumer.start();
-
-         Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         SimpleMessageListener listener = new SimpleMessageListener(NUM_MESSAGES);
-
-         sessCons.setMessageListener(listener);
-
-         ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
-         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
-         connProducer = cf.createConnection();
-
-         Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer prod = sessProd.createProducer(queue1);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage m = sessProd.createTextMessage("testing testing");
-            prod.send(m);
-         }
-
-         cc.close();
-
-         connProducer.close();
-         connProducer = null;
-         connConsumer.close();
-         connConsumer = null;
-      }
-      finally
-      {
-         if (connConsumer != null) connConsumer.close();
-         if (connConsumer != null) connProducer.close();
-
-         removeAllMessages(queue1.getQueueName(), true, 0);
-      }
-   }
-
-   public void testStopWhileProcessing() throws Exception
-   {
-      if (ServerManagement.isRemote()) return;
-
-
-      Connection connConsumer = null;
-
-      try
-      {
-         connConsumer = cf.createConnection();
-
-         connConsumer.start();
-
-         Session sessCons = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         SimpleMessageListener listener = new SimpleMessageListener(0);
-
-         sessCons.setMessageListener(listener);
-
-         ServerSessionPool pool = new MockServerSessionPool(sessCons);
-
-         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);
-
-         stopServerPeer();
-         connConsumer.close();
-         startServerPeer();
-         deployAndLookupAdministeredObjects();
-         connConsumer = null;
-      }
-      finally
-      {
-         if (connConsumer != null) connConsumer.close();
-      }
-   }
-
-
-   class SimpleMessageListener implements MessageListener
-   {
-      CountDownLatch latch = new CountDownLatch(1);
-
-      boolean failed;
-      
-      int msgsReceived;
-      
-      int numExpectedMsgs;
-       
-      SimpleMessageListener(int numExpectedMsgs)
-      {
-         this.numExpectedMsgs = numExpectedMsgs;
-         
-      }
-      
-      synchronized void incMsgsReceived()
-      {
-         msgsReceived++;
-         if (msgsReceived == numExpectedMsgs)
-         {
-            latch.countDown();
-         }
-         
-      }
-      
-      synchronized int getMsgsReceived()
-      {
-         return msgsReceived;
-      }
-      
-      
-      void waitForLatch(long timeout) throws Exception
-      {
-         latch.await(timeout, MILLISECONDS);
-         //Thread.sleep(2000); //Enough time for postDeliver to complete  
-      }
-      
-      public synchronized void onMessage(Message message)
-      {
-         try
-         {                       
-            TextMessage tm = (TextMessage)message;
-            
-            if (!tm.getText().equals("testing testing"))
-            {
-               failed = true;
-            }
-            
-            incMsgsReceived();
-            
-         }
-         catch (Exception e)
-         {
-            log.error(e);
-            failed = true;
-         }
-      }
-   }
-   
-   class RedelMessageListener implements MessageListener
-   {
-      CountDownLatch latch = new CountDownLatch(1);
-
-      boolean failed;
-      
-      int count;
-      
-      Session sess;
-       
-      RedelMessageListener(Session sess)
-      {
-         this.sess = sess;   
-      }
-      
-      void waitForLatch(long timeout) throws Exception
-      {
-         latch.await(timeout, MILLISECONDS);
-      }
-      
-      public synchronized void onMessage(Message message)
-      {
-         try
-         {
-            count++;
-             
-            TextMessage tm = (TextMessage)message;
-            
-            log.trace("Got message " + tm.getText() + " count=" + count);
-            
-            if (count == 1)
-            {
-               log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-               
-               if (!tm.getText().equals("a"))
-               {
-                  log.trace("Expected a but was " + tm.getText());
-                  failed = true;
-                  latch.countDown();
-               }
-            }
-            if (count == 2)
-            {
-               log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-               
-               if (!tm.getText().equals("b"))
-               {
-                  log.trace("Expected b but was " + tm.getText());
-                  failed = true;
-                  latch.countDown();
-               }
-            }
-            if (count == 3)
-            {
-               log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-               
-               if (!tm.getText().equals("c"))
-               {
-                  log.trace("Expected c but was " + tm.getText());
-                  failed = true;
-                  latch.countDown();
-               }
-               else
-               {
-                  if (sess.getAcknowledgeMode() == Session.SESSION_TRANSACTED)
-                  {
-                     log.trace("Rolling back");
-                     sess.rollback();
-                  }                  
-               }
-            }
-            if (count == 4)
-            {
-               log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-               
-               if (!tm.getText().equals("a"))
-               {
-                  log.trace("Expected a but was " + tm.getText());
-                  failed = true;
-                  latch.countDown();
-               }
-               if (!tm.getJMSRedelivered())
-               {
-
-                  failed = true;
-                  latch.countDown();
-               }
-            }
-            if (count == 5)
-            {
-               log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-               
-               if (!tm.getText().equals("b"))
-               {
-                  log.trace("Expected b but was " + tm.getText());
-                  failed = true;
-                  latch.countDown();
-               }
-               if (!tm.getJMSRedelivered())
-               {
-                  log.trace("Redelivered flag not set");
-                  failed = true;
-                  latch.countDown();
-               }
-            }
-            if (count == 6)
-            {
-               log.trace("delivery count:" + tm.getIntProperty("JMSXDeliveryCount"));
-               
-               if (!tm.getText().equals("c"))
-               {
-                  log.trace("Expected c but was " + tm.getText());
-                  failed = true;
-                  latch.countDown();
-               }
-               if (!tm.getJMSRedelivered())
-               {
-                  log.trace("Redelivered flag not set");
-                  failed = true;
-                  latch.countDown();
-               }
-               else
-               {
-                  if (sess.getAcknowledgeMode() == Session.SESSION_TRANSACTED)
-                  {
-                     log.trace("Committing");
-                     sess.commit();
-                  }                 
-                  latch.countDown();
-               }
-            }
-            
-         }
-         catch (JMSException e)
-         {
-            log.error(e);
-            failed = true;
-         }  
-      }      
-   }
-   
-   
-   class MockServerSessionPool implements ServerSessionPool
-   {
-      private ServerSession serverSession;
-      
-      MockServerSessionPool(Session sess)
-      {
-         serverSession = new MockServerSession(sess);
-      }
-
-      public ServerSession getServerSession() throws JMSException
-      {
-         return serverSession;
-      }      
-   }
-   
-   class MockServerSession implements ServerSession
-   {
-      Session session;
-      
-      MockServerSession(Session sess)
-      {
-         this.session = sess;
-      }
-      
-
-      public Session getSession() throws JMSException
-      {
-         return session;
-      }
-
-      public void start() throws JMSException
-      {
-         session.run();
-      }
-      
-   }
-   
-}




More information about the jboss-cvs-commits mailing list