[jboss-cvs] JBoss Messaging SVN: r5142 - in trunk: src/main/org/jboss/messaging/core/client/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Oct 18 14:20:09 EDT 2008
Author: timfox
Date: 2008-10-18 14:20:08 -0400 (Sat, 18 Oct 2008)
New Revision: 5142
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
Log:
Fixed browsing code
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -120,6 +120,4 @@
boolean removeFailureListener(FailureListener listener);
int getVersion();
-
- void createQueueCopy(SimpleString queueName, SimpleString queueCopyName, SimpleString coreSelector, boolean durable, boolean temporary) throws MessagingException;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -21,6 +21,19 @@
*/
package org.jboss.messaging.core.client.impl;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -46,7 +59,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueCopyMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -75,18 +87,6 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiterImpl;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
/*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -591,15 +591,6 @@
return version;
}
- public void createQueueCopy(SimpleString queueName, SimpleString queueCopyName, SimpleString coreSelector, boolean durable, boolean temporary) throws MessagingException
- {
- checkClosed();
-
- SessionCreateQueueCopyMessage sessionCreateQueueCopyMessage = new SessionCreateQueueCopyMessage(queueCopyName, queueName, coreSelector, durable, temporary);
-
- channel.sendBlocking(sessionCreateQueueCopyMessage);
- }
-
// ClientSessionInternal implementation
// ------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -46,8 +46,6 @@
Map<SimpleString, ClientProducerInternal> getProducerCache();
- //void cleanUp() throws Exception;
-
void receiveProducerCredits(long producerID, int credits) throws Exception;
void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -12,20 +12,6 @@
package org.jboss.messaging.core.remoting.impl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.DelayedResult;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
@@ -81,6 +67,37 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -97,7 +114,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueCopyMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -128,22 +144,6 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleIDGenerator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -629,11 +629,6 @@
packet = new SessionCreateQueueMessage();
break;
}
- case SESS_CREATEQUEUECOPY:
- {
- packet = new SessionCreateQueueCopyMessage();
- break;
- }
case SESS_DELETE_QUEUE:
{
packet = new SessionDeleteQueueMessage();
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueCopyMessage.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -1,152 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
-
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateQueueCopyMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private SimpleString queueCopyName;
- private SimpleString queueName;
- private SimpleString filterString;
- private boolean durable;
- private boolean temporary;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionCreateQueueCopyMessage(final SimpleString queueCopyName, final SimpleString queueName,
- final SimpleString filterString, final boolean durable, final boolean temporary)
- {
- super(SESS_CREATEQUEUECOPY);
-
- this.queueCopyName = queueCopyName;
- this.queueName = queueName;
- this.filterString = filterString;
- this.durable = durable;
- this.temporary = temporary;
- }
-
- public SessionCreateQueueCopyMessage()
- {
- super(SESS_CREATEQUEUECOPY);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public String toString()
- {
- StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", queueCopyName=" + queueCopyName);
- buff.append(", queueName=" + queueName);
- buff.append(", filterString=" + filterString);
- buff.append(", durable=" + durable);
- buff.append(", temporary=" + temporary);
- buff.append("]");
- return buff.toString();
- }
-
- public SimpleString getQueueCopyName()
- {
- return queueCopyName;
- }
-
- public SimpleString getQueueName()
- {
- return queueName;
- }
-
- public SimpleString getFilterString()
- {
- return filterString;
- }
-
- public boolean isDurable()
- {
- return durable;
- }
-
- public boolean isTemporary()
- {
- return temporary;
- }
-
- public boolean isRequiresGlobalOrdering()
- {
- return true;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putSimpleString(queueCopyName);
- buffer.putSimpleString(queueName);
- buffer.putNullableSimpleString(filterString);
- buffer.putBoolean(durable);
- buffer.putBoolean(temporary);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- queueCopyName = buffer.getSimpleString();
- queueName = buffer.getSimpleString();
- filterString = buffer.getNullableSimpleString();
- durable = buffer.getBoolean();
- temporary = buffer.getBoolean();
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionCreateQueueCopyMessage == false)
- {
- return false;
- }
-
- SessionCreateQueueCopyMessage r = (SessionCreateQueueCopyMessage)other;
-
- return super.equals(other) && r.queueCopyName.equals(this.queueCopyName) &&
- r.queueName.equals(this.queueName) &&
- (r.filterString == null ? this.filterString == null : r.filterString.equals(this.filterString)) &&
- r.durable == this.durable &&
- r.temporary == this.temporary;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -103,13 +103,7 @@
SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
-
- void createQueueCopy(final SimpleString queue,
- final SimpleString queueCopy,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception;
-
+
void deleteQueue(SimpleString queueName) throws Exception;
SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -208,7 +208,8 @@
if (backup)
{
- return;
+ log.info("backup");
+ return ;
}
MessageReference reference;
@@ -256,7 +257,7 @@
else
{
iterator.remove();
- }
+ }
}
else if (status == HandleStatus.BUSY)
{
@@ -270,7 +271,7 @@
// through the queue
iterator = messageReferences.iterator();
}
- }
+ }
}
public synchronized void addConsumer(final Consumer consumer)
@@ -778,6 +779,7 @@
private HandleStatus deliver(final MessageReference reference)
{
HandleStatus status = distributionPolicy.distribute(reference);
+
if (status == HandleStatus.HANDLED)
{
deliveringCount.incrementAndGet();
@@ -787,6 +789,7 @@
{
promptDelivery = true;
}
+
return status;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -300,6 +300,11 @@
public MessageReference getReference(final long messageID) throws Exception
{
+ if (browseOnly)
+ {
+ return null;
+ }
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -12,6 +12,23 @@
package org.jboss.messaging.core.server.impl;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
@@ -51,23 +68,8 @@
import org.jboss.messaging.util.SimpleIDGenerator;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.SimpleStringIdGenerator;
+import org.jboss.messaging.util.UUIDGenerator;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
/*
* Session implementation
*
@@ -137,8 +139,6 @@
private final SimpleStringIdGenerator simpleStringIdGenerator;
- private final Object queueCopyLock = new Object();
-
// Constructors ---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -296,19 +296,16 @@
{
if (!pager.page(msg))
{
- synchronized (queueCopyLock)
+ List<MessageReference> refs = postOffice.route(msg);
+
+ if (msg.getDurableRefCount() != 0)
{
- List<MessageReference> refs = postOffice.route(msg);
+ storageManager.storeMessage(msg);
+ }
- if (msg.getDurableRefCount() != 0)
- {
- storageManager.storeMessage(msg);
- }
-
- for (MessageReference ref : refs)
- {
- ref.getQueue().addLast(ref);
- }
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
}
}
}
@@ -355,19 +352,23 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
MessageReference ref = consumers.get(consumerID).getReference(messageID);
-
- if (autoCommitAcks)
+
+ //Null implies a browser
+ if (ref != null)
{
- doAck(ref);
+ if (autoCommitAcks)
+ {
+ doAck(ref);
+ }
+ else
+ {
+ tx.addAcknowledgement(ref);
+
+ // Del count is not actually updated in storage unless it's
+ // cancelled
+ ref.incrementDeliveryCount();
+ }
}
- else
- {
- tx.addAcknowledgement(ref);
-
- // Del count is not actually updated in storage unless it's
- // cancelled
- ref.incrementDeliveryCount();
- }
}
public void rollback() throws Exception
@@ -413,16 +414,16 @@
if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
{
Queue queue = ref.getQueue();
-
+
LinkedList<MessageReference> list = queueMap.get(queue);
-
+
if (list == null)
{
list = new LinkedList<MessageReference>();
-
+
queueMap.put(queue, list);
}
-
+
list.add(ref);
}
}
@@ -825,37 +826,6 @@
}
}
- public void createQueueCopy(final SimpleString queue,
- final SimpleString queueCopy,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception
- {
- Binding binding = postOffice.getBinding(queue);
- if (binding == null)
- {
- throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
- }
- //we need to stop messages being routed whilst we make a copy of the queue. altho this will be extremely quick
- // even for large quantities of references
- synchronized (queueCopyLock)
- {
- createQueue(binding.getAddress(), queueCopy, filterString, durable, temporary);
- Queue newQ = postOffice.getBinding(queueCopy).getQueue();
- Filter filter = null;
-
- if (filterString != null)
- {
- filter = new FilterImpl(filterString);
- }
- List<MessageReference> refs = binding.getQueue().list(filter);
- for (MessageReference ref : refs)
- {
- newQ.addLast(ref.getMessage().createReference(newQ));
- }
- }
- }
-
public void deleteQueue(final SimpleString queueName) throws Exception
{
Binding binding = postOffice.removeBinding(queueName);
@@ -913,9 +883,37 @@
maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
+ Queue theQueue;
+ if (browseOnly)
+ {
+ // We consume a copy of the queue - TODO - this is a temporary measure
+ // and will disappear once we can provide a proper iterator on the queue
+
+ theQueue = new QueueImpl(-1,
+ queueName,
+ filter,
+ false,
+ false,
+ false,
+ null,
+ postOffice);
+
+ //There's no need for any special locking since the list method is synchronized
+ List<MessageReference> refs = binding.getQueue().list(filter);
+
+ for (MessageReference ref : refs)
+ {
+ theQueue.addLast(ref);
+ }
+ }
+ else
+ {
+ theQueue = binding.getQueue();
+ }
+
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
this,
- binding.getQueue(),
+ theQueue,
filter,
windowSize != -1,
maxRate,
@@ -1041,7 +1039,6 @@
return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
}
-
public void closeConsumer(final long consumerID) throws Exception
{
consumers.get(consumerID).close();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -13,15 +13,6 @@
package org.jboss.messaging.core.server.impl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.DelayedResult;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
@@ -31,7 +22,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUECOPY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -57,6 +47,20 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -64,7 +68,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueCopyMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
@@ -89,9 +92,6 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
-import javax.transaction.xa.Xid;
-import java.util.List;
-
/**
* A ServerSessionPacketHandler
*
@@ -174,17 +174,6 @@
response = new NullResponseMessage();
break;
}
- case SESS_CREATEQUEUECOPY:
- {
- SessionCreateQueueCopyMessage request = (SessionCreateQueueCopyMessage)packet;
- session.createQueueCopy(request.getQueueName(),
- request.getQueueCopyName(),
- request.getFilterString(),
- request.isDurable(),
- request.isTemporary());
- response = new NullResponseMessage();
- break;
- }
case SESS_DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -22,6 +22,14 @@
package org.jboss.messaging.jms.client;
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
@@ -30,13 +38,6 @@
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.util.SimpleString;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import java.util.Enumeration;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -61,19 +62,17 @@
private JBossQueue queue;
- private SimpleString messageSelector;
+ private SimpleString filterString;
- private SimpleString queueName;
-
// Constructors ---------------------------------------------------------------------------------
- public JBossQueueBrowser(JBossQueue queue, String messageSelector, ClientSession session)
+ public JBossQueueBrowser(JBossQueue queue, String messageSelector, ClientSession session) throws JMSException
{
this.session = session;
this.queue = queue;
- if(messageSelector != null)
+ if (messageSelector != null)
{
- this. messageSelector = new SimpleString(SelectorTranslator.convertToJBMFilterString(messageSelector));
+ this.filterString = new SimpleString(SelectorTranslator.convertToJBMFilterString(messageSelector));
}
}
@@ -86,7 +85,6 @@
try
{
consumer.close();
- session.deleteQueue(queueName);
}
catch (MessagingException e)
{
@@ -100,9 +98,9 @@
try
{
close();
- queueName = new SimpleString(UUID.randomUUID().toString());
- session.createQueueCopy(queue.getSimpleAddress(), queueName, messageSelector, false, true);
- consumer = session.createConsumer(queueName, null, false, true);
+
+ consumer = session.createConsumer(queue.getSimpleAddress(), filterString, false, true);
+
return new BrowserEnumeration();
}
catch (MessagingException e)
@@ -114,7 +112,7 @@
public String getMessageSelector() throws JMSException
{
- return messageSelector == null?null:messageSelector.toString();
+ return filterString == null ? null : filterString.toString();
}
public Queue getQueue() throws JMSException
@@ -147,7 +145,8 @@
{
try
{
- //todo change this to consumer.receiveImmediate() once https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
+ // todo change this to consumer.receiveImmediate() once
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
current = consumer.receive(NEXT_MESSAGE_TIMEOUT);
}
catch (MessagingException e)
@@ -172,7 +171,7 @@
}
catch (Exception e)
{
- log.error("Failed to prepare message", e);
+ log.error("Failed to create message", e);
return null;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -80,7 +80,7 @@
consumer.setStatusImmediate(HandleStatus.HANDLED);
queue.deliver();
-
+
if (sender.getException() != null)
{
throw sender.getException();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java 2008-10-18 09:52:29 UTC (rev 5141)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java 2008-10-18 18:20:08 UTC (rev 5142)
@@ -22,12 +22,19 @@
package org.jboss.messaging.tests.unit.jms.client;
-import junit.framework.TestCase;
-import org.easymock.EasyMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.util.Enumeration;
+
+import javax.jms.Message;
+
+import junit.framework.TestCase;
+
+import org.easymock.EasyMock;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientSession;
@@ -35,12 +42,8 @@
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.client.JBossMessage;
import org.jboss.messaging.jms.client.JBossQueueBrowser;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import org.jboss.messaging.util.SimpleString;
-import javax.jms.Message;
-import java.util.Enumeration;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -127,8 +130,7 @@
{
JBossQueue queue = new JBossQueue(randomString());
ClientConsumer consumer = createStrictMock(ClientConsumer.class);
- ClientSession session = createStrictMock(ClientSession.class);
- session.createQueueCopy((SimpleString) EasyMock.eq(queue.getSimpleAddress()), (SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.eq(null), EasyMock.anyBoolean(),EasyMock.anyBoolean());
+ ClientSession session = createStrictMock(ClientSession.class);
expect(session.createConsumer((SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.anyObject(), EasyMock.anyBoolean(), EasyMock.anyBoolean())).andReturn(consumer);
replay(session, consumer);
@@ -147,8 +149,7 @@
ClientMessage clientMessage = createStrictMock(ClientMessage.class);
MessagingBuffer buffer = createStrictMock(MessagingBuffer.class);
ClientConsumer consumer = createStrictMock(ClientConsumer.class);
- ClientSession session = createStrictMock(ClientSession.class);
- session.createQueueCopy((SimpleString) EasyMock.eq(queue.getSimpleAddress()), (SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.eq(null), EasyMock.anyBoolean(),EasyMock.anyBoolean());
+ ClientSession session = createStrictMock(ClientSession.class);
expect(session.createConsumer((SimpleString) EasyMock.anyObject(), (SimpleString) EasyMock.anyObject(), EasyMock.anyBoolean(), EasyMock.anyBoolean())).andReturn(consumer);
expect(consumer.receive(1000)).andReturn(clientMessage);
expect(clientMessage.getType()).andReturn(JBossMessage.TYPE);
More information about the jboss-cvs-commits
mailing list