[jboss-cvs] JBoss Messaging SVN: r5142 - in trunk: src/main/org/jboss/messaging/core/client/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Oct 18 14:20:09 EDT 2008


Author: timfox
Date: 2008-10-18 14:20:08 -0400 (Sat, 18 Oct 2008)
New Revision: 5142

Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
   trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
Log:
Fixed browsing code


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -120,6 +120,4 @@
    boolean removeFailureListener(FailureListener listener);
 
    int getVersion();
-
-    void createQueueCopy(SimpleString queueName, SimpleString queueCopyName, SimpleString coreSelector, boolean durable, boolean temporary) throws MessagingException;
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -21,6 +21,19 @@
  */
 package org.jboss.messaging.core.client.impl;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -46,7 +59,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueCopyMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -75,18 +87,6 @@
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -591,15 +591,6 @@
       return version;
    }
 
-   public void createQueueCopy(SimpleString queueName, SimpleString queueCopyName, SimpleString coreSelector, boolean durable, boolean temporary) throws MessagingException
-   {
-      checkClosed();
-
-      SessionCreateQueueCopyMessage sessionCreateQueueCopyMessage = new SessionCreateQueueCopyMessage(queueCopyName, queueName, coreSelector,  durable, temporary);
-
-      channel.sendBlocking(sessionCreateQueueCopyMessage);
-   }
-
    // ClientSessionInternal implementation
    // ------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -46,8 +46,6 @@
 
    Map<SimpleString, ClientProducerInternal> getProducerCache();
 
-   //void cleanUp() throws Exception;
-
    void receiveProducerCredits(long producerID, int credits) throws Exception;
 
    void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -12,20 +12,6 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.DelayedResult;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
@@ -81,6 +67,37 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -97,7 +114,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueCopyMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -128,22 +144,6 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleIDGenerator;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -629,11 +629,6 @@
             packet = new SessionCreateQueueMessage();
             break;
          }
-         case SESS_CREATEQUEUECOPY:
-         {
-            packet = new SessionCreateQueueCopyMessage();
-            break;
-         }
          case SESS_DELETE_QUEUE:
          {
             packet = new SessionDeleteQueueMessage();

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -1,152 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
-
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateQueueCopyMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private SimpleString queueCopyName;
-   private SimpleString queueName;
-   private SimpleString filterString;
-   private boolean durable;
-   private boolean temporary;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionCreateQueueCopyMessage(final SimpleString queueCopyName, final SimpleString queueName,
-   		final SimpleString filterString, final boolean durable, final boolean temporary)
-   {
-      super(SESS_CREATEQUEUECOPY);
-
-      this.queueCopyName = queueCopyName;
-      this.queueName = queueName;
-      this.filterString = filterString;
-      this.durable = durable;
-      this.temporary = temporary;
-   }
-
-   public SessionCreateQueueCopyMessage()
-   {
-      super(SESS_CREATEQUEUECOPY);
-   }
-
-   // Public --------------------------------------------------------
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buff = new StringBuffer(getParentString());
-      buff.append(", queueCopyName=" + queueCopyName);
-      buff.append(", queueName=" + queueName);
-      buff.append(", filterString=" + filterString);
-      buff.append(", durable=" + durable);
-      buff.append(", temporary=" + temporary);
-      buff.append("]");
-      return buff.toString();
-   }
-
-   public SimpleString getQueueCopyName()
-   {
-      return queueCopyName;
-   }
-
-   public SimpleString getQueueName()
-   {
-      return queueName;
-   }
-
-   public SimpleString getFilterString()
-   {
-      return filterString;
-   }
-
-   public boolean isDurable()
-   {
-      return durable;
-   }
-
-   public boolean isTemporary()
-   {
-      return temporary;
-   }
-
-   public boolean isRequiresGlobalOrdering()
-   {
-      return true;
-   }
-
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putSimpleString(queueCopyName);
-      buffer.putSimpleString(queueName);
-      buffer.putNullableSimpleString(filterString);
-      buffer.putBoolean(durable);
-      buffer.putBoolean(temporary);
-   }
-
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      queueCopyName = buffer.getSimpleString();
-      queueName = buffer.getSimpleString();
-      filterString = buffer.getNullableSimpleString();
-      durable = buffer.getBoolean();
-      temporary = buffer.getBoolean();
-   }
-
-   public boolean equals(Object other)
-   {
-      if (other instanceof SessionCreateQueueCopyMessage == false)
-      {
-         return false;
-      }
-
-      SessionCreateQueueCopyMessage r = (SessionCreateQueueCopyMessage)other;
-
-      return super.equals(other) && r.queueCopyName.equals(this.queueCopyName) && 
-             r.queueName.equals(this.queueName) &&
-             (r.filterString == null ? this.filterString == null : r.filterString.equals(this.filterString)) &&
-             r.durable == this.durable &&
-             r.temporary == this.temporary;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -103,13 +103,7 @@
                     SimpleString filterString,
                     boolean durable,
                     boolean temporary) throws Exception;
-
-   void createQueueCopy(final SimpleString queue,
-                        final SimpleString queueCopy,
-                        final SimpleString filterString,
-                        final boolean durable,
-                        final boolean temporary) throws Exception;
-
+  
    void deleteQueue(SimpleString queueName) throws Exception;
 
    SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -208,7 +208,8 @@
 
       if (backup)
       {
-         return;
+         log.info("backup");
+         return ;
       }
 
       MessageReference reference;
@@ -256,7 +257,7 @@
             else
             {
                iterator.remove();
-            }
+            }            
          }
          else if (status == HandleStatus.BUSY)
          {
@@ -270,7 +271,7 @@
             // through the queue
             iterator = messageReferences.iterator();
          }
-      }
+      }     
    }
 
    public synchronized void addConsumer(final Consumer consumer)
@@ -778,6 +779,7 @@
    private HandleStatus deliver(final MessageReference reference)
    {
       HandleStatus status = distributionPolicy.distribute(reference);
+      
       if (status == HandleStatus.HANDLED)
       {
          deliveringCount.incrementAndGet();
@@ -787,6 +789,7 @@
       {
          promptDelivery = true;
       }
+      
       return status;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -300,6 +300,11 @@
 
    public MessageReference getReference(final long messageID) throws Exception
    {
+      if (browseOnly)
+      {
+         return null;
+      }
+      
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -12,6 +12,23 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
@@ -51,23 +68,8 @@
 import org.jboss.messaging.util.SimpleIDGenerator;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.SimpleStringIdGenerator;
+import org.jboss.messaging.util.UUIDGenerator;
 
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
 /*
  * Session implementation 
  * 
@@ -137,8 +139,6 @@
 
    private final SimpleStringIdGenerator simpleStringIdGenerator;
 
-   private final Object queueCopyLock = new Object();
-
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final String name,
@@ -296,19 +296,16 @@
       {
          if (!pager.page(msg))
          {
-            synchronized (queueCopyLock)
+            List<MessageReference> refs = postOffice.route(msg);
+
+            if (msg.getDurableRefCount() != 0)
             {
-               List<MessageReference> refs = postOffice.route(msg);
+               storageManager.storeMessage(msg);
+            }
 
-               if (msg.getDurableRefCount() != 0)
-               {
-                  storageManager.storeMessage(msg);
-               }
-
-               for (MessageReference ref : refs)
-               {
-                  ref.getQueue().addLast(ref);
-               }
+            for (MessageReference ref : refs)
+            {
+               ref.getQueue().addLast(ref);
             }
          }
       }
@@ -355,19 +352,23 @@
    public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
       MessageReference ref = consumers.get(consumerID).getReference(messageID);
-
-      if (autoCommitAcks)
+      
+      //Null implies a browser
+      if (ref != null)
       {
-         doAck(ref);
+         if (autoCommitAcks)
+         {
+            doAck(ref);
+         }
+         else
+         {
+            tx.addAcknowledgement(ref);
+   
+            // Del count is not actually updated in storage unless it's
+            // cancelled
+            ref.incrementDeliveryCount();
+         }
       }
-      else
-      {
-         tx.addAcknowledgement(ref);
-
-         // Del count is not actually updated in storage unless it's
-         // cancelled
-         ref.incrementDeliveryCount();
-      }
    }
 
    public void rollback() throws Exception
@@ -413,16 +414,16 @@
          if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
          {
             Queue queue = ref.getQueue();
-   
+
             LinkedList<MessageReference> list = queueMap.get(queue);
-   
+
             if (list == null)
             {
                list = new LinkedList<MessageReference>();
-   
+
                queueMap.put(queue, list);
             }
-   
+
             list.add(ref);
          }
       }
@@ -825,37 +826,6 @@
       }
    }
 
-   public void createQueueCopy(final SimpleString queue,
-                               final SimpleString queueCopy,
-                               final SimpleString filterString,
-                               final boolean durable,
-                               final boolean temporary) throws Exception
-   {
-      Binding binding = postOffice.getBinding(queue);
-      if (binding == null)
-      {
-         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
-      }
-      //we need to stop messages being routed whilst we make a copy of the queue. altho this will be extremely quick
-      // even for large quantities of references
-      synchronized (queueCopyLock)
-      {
-         createQueue(binding.getAddress(), queueCopy, filterString, durable, temporary);
-         Queue newQ = postOffice.getBinding(queueCopy).getQueue();
-         Filter filter = null;
-
-         if (filterString != null)
-         {
-            filter = new FilterImpl(filterString);
-         }
-         List<MessageReference> refs = binding.getQueue().list(filter);
-         for (MessageReference ref : refs)
-         {
-            newQ.addLast(ref.getMessage().createReference(newQ));
-         }
-      }
-   }
-
    public void deleteQueue(final SimpleString queueName) throws Exception
    {
       Binding binding = postOffice.removeBinding(queueName);
@@ -913,9 +883,37 @@
 
       maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
 
+      Queue theQueue;
+      if (browseOnly)
+      {
+         // We consume a copy of the queue - TODO - this is a temporary measure
+         // and will disappear once we can provide a proper iterator on the queue
+
+         theQueue = new QueueImpl(-1,
+                                  queueName,
+                                  filter,
+                                  false,
+                                  false,
+                                  false,
+                                  null,
+                                  postOffice);
+
+         //There's no need for any special locking since the list method is synchronized
+         List<MessageReference> refs = binding.getQueue().list(filter);
+
+         for (MessageReference ref : refs)
+         {
+            theQueue.addLast(ref);
+         }
+      }
+      else
+      {
+         theQueue = binding.getQueue();
+      }
+
       ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                        this,
-                                                       binding.getQueue(),
+                                                       theQueue,
                                                        filter,
                                                        windowSize != -1,
                                                        maxRate,
@@ -1041,7 +1039,6 @@
       return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
    }
 
-
    public void closeConsumer(final long consumerID) throws Exception
    {
       consumers.get(consumerID).close();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -13,15 +13,6 @@
 package org.jboss.messaging.core.server.impl;
 
 
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.DelayedResult;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
@@ -31,7 +22,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUECOPY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -57,6 +47,20 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -64,7 +68,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueCopyMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
@@ -89,9 +92,6 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
-import javax.transaction.xa.Xid;
-import java.util.List;
-
 /**
  * A ServerSessionPacketHandler
  *
@@ -174,17 +174,6 @@
                response = new NullResponseMessage();
                break;
             }
-            case SESS_CREATEQUEUECOPY:
-            {
-               SessionCreateQueueCopyMessage request = (SessionCreateQueueCopyMessage)packet;
-               session.createQueueCopy(request.getQueueName(),
-                                   request.getQueueCopyName(),
-                                   request.getFilterString(),
-                                   request.isDurable(),
-                                   request.isTemporary());
-               response = new NullResponseMessage();
-               break;
-            }
             case SESS_DELETE_QUEUE:
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -22,6 +22,14 @@
 
 package org.jboss.messaging.jms.client;
 
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
@@ -30,13 +38,6 @@
 import org.jboss.messaging.jms.JBossQueue;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import java.util.Enumeration;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -61,19 +62,17 @@
 
    private JBossQueue queue;
 
-   private SimpleString messageSelector;
+   private SimpleString filterString;
 
-   private SimpleString queueName;
-
    // Constructors ---------------------------------------------------------------------------------
 
-   public JBossQueueBrowser(JBossQueue queue, String messageSelector, ClientSession session)
+   public JBossQueueBrowser(JBossQueue queue, String messageSelector, ClientSession session) throws JMSException
    {
       this.session = session;
       this.queue = queue;
-      if(messageSelector != null)
+      if (messageSelector != null)
       {
-        this. messageSelector = new SimpleString(SelectorTranslator.convertToJBMFilterString(messageSelector));
+         this.filterString = new SimpleString(SelectorTranslator.convertToJBMFilterString(messageSelector));
       }
    }
 
@@ -86,7 +85,6 @@
          try
          {
             consumer.close();
-            session.deleteQueue(queueName);
          }
          catch (MessagingException e)
          {
@@ -100,9 +98,9 @@
       try
       {
          close();
-         queueName = new SimpleString(UUID.randomUUID().toString());
-         session.createQueueCopy(queue.getSimpleAddress(), queueName, messageSelector, false, true);
-         consumer = session.createConsumer(queueName, null, false, true);
+
+         consumer = session.createConsumer(queue.getSimpleAddress(), filterString, false, true);
+
          return new BrowserEnumeration();
       }
       catch (MessagingException e)
@@ -114,7 +112,7 @@
 
    public String getMessageSelector() throws JMSException
    {
-      return messageSelector == null?null:messageSelector.toString();
+      return filterString == null ? null : filterString.toString();
    }
 
    public Queue getQueue() throws JMSException
@@ -147,7 +145,8 @@
          {
             try
             {
-               //todo change this to consumer.receiveImmediate() once https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
+               // todo change this to consumer.receiveImmediate() once
+               // https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
                current = consumer.receive(NEXT_MESSAGE_TIMEOUT);
             }
             catch (MessagingException e)
@@ -172,7 +171,7 @@
             }
             catch (Exception e)
             {
-               log.error("Failed to prepare message", e);
+               log.error("Failed to create message", e);
 
                return null;
             }

Modified: trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -80,7 +80,7 @@
       consumer.setStatusImmediate(HandleStatus.HANDLED);
       
       queue.deliver();
-      
+
       if (sender.getException() != null)
       {
          throw sender.getException();

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java	2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java	2008-10-18 18:20:08 UTC (rev 5142)
@@ -22,12 +22,19 @@
 
 package org.jboss.messaging.tests.unit.jms.client;
 
-import junit.framework.TestCase;
-import org.easymock.EasyMock;
 import static org.easymock.EasyMock.createStrictMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.util.Enumeration;
+
+import javax.jms.Message;
+
+import junit.framework.TestCase;
+
+import org.easymock.EasyMock;
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
@@ -35,12 +42,8 @@
 import org.jboss.messaging.jms.JBossQueue;
 import org.jboss.messaging.jms.client.JBossMessage;
 import org.jboss.messaging.jms.client.JBossQueueBrowser;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.jms.Message;
-import java.util.Enumeration;
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  *
@@ -127,8 +130,7 @@
    {
       JBossQueue queue = new JBossQueue(randomString());
       ClientConsumer consumer = createStrictMock(ClientConsumer.class);
-      ClientSession session = createStrictMock(ClientSession.class);
-      session.createQueueCopy((SimpleString) EasyMock.eq(queue.getSimpleAddress()), (SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.eq(null), EasyMock.anyBoolean(),EasyMock.anyBoolean());
+      ClientSession session = createStrictMock(ClientSession.class);    
       expect(session.createConsumer((SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.anyObject(), EasyMock.anyBoolean(), EasyMock.anyBoolean())).andReturn(consumer);
       replay(session, consumer);
 
@@ -147,8 +149,7 @@
       ClientMessage clientMessage = createStrictMock(ClientMessage.class);
       MessagingBuffer buffer = createStrictMock(MessagingBuffer.class);
       ClientConsumer consumer = createStrictMock(ClientConsumer.class);
-      ClientSession session = createStrictMock(ClientSession.class);
-      session.createQueueCopy((SimpleString) EasyMock.eq(queue.getSimpleAddress()), (SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.eq(null), EasyMock.anyBoolean(),EasyMock.anyBoolean());
+      ClientSession session = createStrictMock(ClientSession.class);   
       expect(session.createConsumer((SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.anyObject(), EasyMock.anyBoolean(), EasyMock.anyBoolean())).andReturn(consumer);
       expect(consumer.receive(1000)).andReturn(clientMessage);
       expect(clientMessage.getType()).andReturn(JBossMessage.TYPE);




More information about the jboss-cvs-commits mailing list