Author: clebert.suconic(a)jboss.com
Date: 2011-03-22 09:57:39 -0400 (Tue, 22 Mar 2011)
New Revision: 10351
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
Log:
https://issues.jboss.org/browse/HORNETQ-659 - small fix to fix a memory leak on
clustering
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2011-03-22
03:47:43 UTC (rev 10350)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2011-03-22
13:57:39 UTC (rev 10351)
@@ -107,14 +107,16 @@
try
{
result = parser.parse(sfilterString, identifiers);
-
+
resultType = result.getClass();
}
catch (Throwable e)
{
FilterImpl.log.error("Invalid filter: " + str, e);
- throw new HornetQException(HornetQException.INVALID_FILTER_EXPRESSION,
"Invalid filter: " + sfilterString + " " + e.getMessage());
+ throw new HornetQException(HornetQException.INVALID_FILTER_EXPRESSION,
"Invalid filter: " + sfilterString +
+ "
" +
+
e.getMessage());
}
}
@@ -173,6 +175,41 @@
}
/* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((sfilterString == null) ? 0 :
sfilterString.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FilterImpl other = (FilterImpl)obj;
+ if (sfilterString == null)
+ {
+ if (other.sfilterString != null)
+ return false;
+ }
+ else if (!sfilterString.equals(other.sfilterString))
+ return false;
+ return true;
+ }
+
+ /* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
@@ -187,7 +224,7 @@
{
if (FilterConstants.HORNETQ_USERID.equals(fieldName))
{
- //It's the stringified (hex) representation of a user id that can be used in
a selector expression
+ // It's the stringified (hex) representation of a user id that can be used
in a selector expression
return new SimpleString("ID:" + msg.getUserID());
}
else if (FilterConstants.HORNETQ_PRIORITY.equals(fieldName))
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-03-22
03:47:43 UTC (rev 10350)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-03-22
13:57:39 UTC (rev 10351)
@@ -181,7 +181,7 @@
public void route(final ServerMessage message, final RoutingContext context)
{
addRouteContextToMessage(message);
-
+
List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
if (!durableQueuesOnContext.contains(storeAndForwardQueue))
@@ -203,7 +203,7 @@
if (i == null)
{
- filterCounts.put(filterString, 0);
+ filterCounts.put(filterString, 1);
filters.add(FilterImpl.createFilter(filterString));
}
@@ -230,7 +230,7 @@
{
filterCounts.remove(filterString);
- filters.remove(filterString);
+ filters.remove(FilterImpl.createFilter(filterString));
}
else
{
@@ -273,13 +273,17 @@
uniqueName +
"]";
}
-
+
+ public Set<Filter> getFilters()
+ {
+ return filters;
+ }
+
public void close() throws Exception
{
storeAndForwardQueue.close();
}
-
/**
* This will add routing information to the message.
* This will be later processed during the delivery between the nodes. Because of that
this has to be persisted as a property on the message.
@@ -309,5 +313,4 @@
message.putBytesProperty(idsHeaderName, ids);
}
-
}
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-03-22
13:57:39 UTC (rev 10351)
@@ -0,0 +1,637 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.server.cluster.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.server.Consumer;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A RemoteQueueBindImplTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class RemoteQueueBindImplTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAddRemoveConsumer() throws Exception
+ {
+
+ final long id = RandomUtil.randomLong();
+ final SimpleString address = RandomUtil.randomSimpleString();
+ final SimpleString uniqueName = RandomUtil.randomSimpleString();
+ final SimpleString routingName = RandomUtil.randomSimpleString();
+ final Long remoteQueueID = RandomUtil.randomLong();
+ final SimpleString filterString = new SimpleString("A>B");
+ final Queue storeAndForwardQueue = new FakeQueue();
+ final SimpleString bridgeName = RandomUtil.randomSimpleString();
+ final int distance = 0;
+ RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id,
+ address,
+ uniqueName,
+ routingName,
+ remoteQueueID,
+ filterString,
+ storeAndForwardQueue,
+ bridgeName,
+ distance);
+
+ for (int i = 0; i < 100; i++)
+ {
+ binding.addConsumer(new SimpleString("B" + i + "<A"));
+ }
+
+ assertEquals(100, binding.getFilters().size());
+
+ for (int i = 0; i < 100; i++)
+ {
+ binding.removeConsumer(new SimpleString("B" + i +
"<A"));
+ }
+
+ assertEquals(0, binding.getFilters().size());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class FakeQueue implements Queue
+ {
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage,
org.hornetq.core.server.RoutingContext)
+ */
+ public void route(ServerMessage message, RoutingContext context) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getName()
+ */
+ public SimpleString getName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getID()
+ */
+ public long getID()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getFilter()
+ */
+ public Filter getFilter()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getPageSubscription()
+ */
+ public PageSubscription getPageSubscription()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isDurable()
+ */
+ public boolean isDurable()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isTemporary()
+ */
+ public boolean isTemporary()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#addConsumer(org.hornetq.core.server.Consumer)
+ */
+ public void addConsumer(Consumer consumer) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#removeConsumer(org.hornetq.core.server.Consumer)
+ */
+ public void removeConsumer(Consumer consumer) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getConsumerCount()
+ */
+ public int getConsumerCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#reload(org.hornetq.core.server.MessageReference)
+ */
+ public void reload(MessageReference ref)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#addTail(org.hornetq.core.server.MessageReference)
+ */
+ public void addTail(MessageReference ref)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#addTail(org.hornetq.core.server.MessageReference, boolean)
+ */
+ public void addTail(MessageReference ref, boolean direct)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#addHead(org.hornetq.core.server.MessageReference)
+ */
+ public void addHead(MessageReference ref)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge(MessageReference ref) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.transaction.Transaction,
org.hornetq.core.server.MessageReference)
+ */
+ public void acknowledge(Transaction tx, MessageReference ref) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#reacknowledge(org.hornetq.core.transaction.Transaction,
org.hornetq.core.server.MessageReference)
+ */
+ public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction,
org.hornetq.core.server.MessageReference)
+ */
+ public void cancel(Transaction tx, MessageReference ref) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference, long)
+ */
+ public void cancel(MessageReference reference, long timeBase) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#deliverAsync()
+ */
+ public void deliverAsync()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getMessageCount()
+ */
+ public long getMessageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getDeliveringCount()
+ */
+ public int getDeliveringCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#referenceHandled()
+ */
+ public void referenceHandled()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getScheduledCount()
+ */
+ public int getScheduledCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getScheduledMessages()
+ */
+ public List<MessageReference> getScheduledMessages()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getMessagesAdded()
+ */
+ public long getMessagesAdded()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#removeReferenceWithID(long)
+ */
+ public MessageReference removeReferenceWithID(long id) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getReference(long)
+ */
+ public MessageReference getReference(long id)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#deleteAllReferences()
+ */
+ public int deleteAllReferences() throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#deleteReference(long)
+ */
+ public boolean deleteReference(long messageID) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#deleteMatchingReferences(org.hornetq.core.filter.Filter)
+ */
+ public int deleteMatchingReferences(Filter filter) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#expireReference(long)
+ */
+ public boolean expireReference(long messageID) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
+ */
+ public int expireReferences(Filter filter) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#expireReferences()
+ */
+ public void expireReferences() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#expire(org.hornetq.core.server.MessageReference)
+ */
+ public void expire(MessageReference ref) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#sendMessageToDeadLetterAddress(long)
+ */
+ public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#sendMessagesToDeadLetterAddress(org.hornetq.core.filter.Filter)
+ */
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#changeReferencePriority(long, byte)
+ */
+ public boolean changeReferencePriority(long messageID, byte newPriority) throws
Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#changeReferencesPriority(org.hornetq.core.filter.Filter,
byte)
+ */
+ public int changeReferencesPriority(Filter filter, byte newPriority) throws
Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#moveReference(long,
org.hornetq.api.core.SimpleString)
+ */
+ public boolean moveReference(long messageID, SimpleString toAddress) throws
Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#moveReference(long,
org.hornetq.api.core.SimpleString, boolean)
+ */
+ public boolean moveReference(long messageID, SimpleString toAddress, boolean
rejectDuplicates) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#moveReferences(org.hornetq.core.filter.Filter,
org.hornetq.api.core.SimpleString)
+ */
+ public int moveReferences(Filter filter, SimpleString toAddress) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#moveReferences(org.hornetq.core.filter.Filter,
org.hornetq.api.core.SimpleString, boolean)
+ */
+ public int moveReferences(Filter filter, SimpleString toAddress, boolean
rejectDuplicates) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#addRedistributor(long)
+ */
+ public void addRedistributor(long delay)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#cancelRedistributor()
+ */
+ public void cancelRedistributor() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#hasMatchingConsumer(org.hornetq.core.server.ServerMessage)
+ */
+ public boolean hasMatchingConsumer(ServerMessage message)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getConsumers()
+ */
+ public Collection<Consumer> getConsumers()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#checkRedelivery(org.hornetq.core.server.MessageReference,
long)
+ */
+ public boolean checkRedelivery(MessageReference ref, long timeBase) throws
Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#iterator()
+ */
+ public LinkedListIterator<MessageReference> iterator()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.api.core.SimpleString)
+ */
+ public void setExpiryAddress(SimpleString expiryAddress)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#pause()
+ */
+ public void pause()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#resume()
+ */
+ public void resume()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isPaused()
+ */
+ public boolean isPaused()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getExecutor()
+ */
+ public Executor getExecutor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#resetAllIterators()
+ */
+ public void resetAllIterators()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#blockOnExecutorFuture()
+ */
+ public boolean blockOnExecutorFuture()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#close()
+ */
+ public void close() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isDirectDeliver()
+ */
+ public boolean isDirectDeliver()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getAddress()
+ */
+ public SimpleString getAddress()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+
+}