JBoss hornetq SVN: r10189 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-08 16:20:33 -0500 (Tue, 08 Feb 2011)
New Revision: 10189
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
upload new build
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-02-08 21:00:38 UTC (rev 10188)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-02-08 21:20:33 UTC (rev 10189)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.EAP-QA-10183"/>
+ <property name="hornetq.version" value="2.2.0.EAP-QA-10185"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-02-08 21:00:38 UTC (rev 10188)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-02-08 21:20:33 UTC (rev 10189)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.EAP-QA-10183</hornetq.version>
+ <hornetq.version>2.2.0.EAP-QA-10185</hornetq.version>
</properties>
<licenses>
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-02-08 21:00:38 UTC (rev 10188)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-02-08 21:20:33 UTC (rev 10189)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=QA_10182
+hornetq.version.versionName=QA_10185
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=0
13 years, 11 months
JBoss hornetq SVN: r10188 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-08 16:00:38 -0500 (Tue, 08 Feb 2011)
New Revision: 10188
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
Fixing Iterators leakage on QueueImpl - https://issues.jboss.org/browse/JBPAPP-5870
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -40,6 +40,7 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -400,17 +401,24 @@
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
queue.blockOnExecutorFuture();
- Iterator<MessageReference> iterator = queue.iterator();
- while (iterator.hasNext())
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
+ try
{
- MessageReference ref = (MessageReference)iterator.next();
- if (filter == null || filter.match(ref.getMessage()))
+ while (iterator.hasNext())
{
- Message message = ref.getMessage();
- messages.add(message.toMap());
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ Message message = ref.getMessage();
+ messages.add(message.toMap());
+ }
}
+ return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
}
- return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
+ finally
+ {
+ iterator.close();
+ }
}
catch (HornetQException e)
{
@@ -451,17 +459,24 @@
}
else
{
- Iterator<MessageReference> iterator = queue.iterator();
- int count = 0;
- while (iterator.hasNext())
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
+ try
{
- MessageReference ref = (MessageReference)iterator.next();
- if (filter.match(ref.getMessage()))
+ int count = 0;
+ while (iterator.hasNext())
{
- count++;
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter.match(ref.getMessage()))
+ {
+ count++;
+ }
}
+ return count;
}
- return count;
+ finally
+ {
+ iterator.close();
+ }
}
}
finally
@@ -574,7 +589,6 @@
return moveMessages(filterStr, otherQueueName, false);
}
-
public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
{
checkStarted();
@@ -827,7 +841,7 @@
obj.put("sessionID", serverConsumer.getSessionID());
obj.put("browseOnly", serverConsumer.isBrowseOnly());
obj.put("creationTime", serverConsumer.getCreationTime());
-
+
jsonArray.put(obj);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -14,7 +14,6 @@
package org.hornetq.core.server;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
@@ -22,6 +21,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
/**
*
@@ -131,7 +131,7 @@
boolean checkDLQ(MessageReference ref) throws Exception;
- Iterator<MessageReference> iterator();
+ LinkedListIterator<MessageReference> iterator();
void setExpiryAddress(SimpleString expiryAddress);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -93,18 +93,18 @@
private final boolean temporary;
private final PostOffice postOffice;
-
+
private final PageSubscription pageSubscription;
-
+
private final LinkedListIterator<PagedReference> pageIterator;
private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
-
+
// The quantity of pagedReferences on messageREferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
-
+
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
@@ -157,7 +157,7 @@
private volatile boolean checkDirect;
private volatile boolean directDeliver = true;
-
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -171,20 +171,19 @@
final Executor executor)
{
this(id,
- address,
- name,
- filter,
- null,
- durable,
- temporary,
- scheduledExecutor,
- postOffice,
- storageManager,
- addressSettingsRepository,
- executor);
+ address,
+ name,
+ filter,
+ null,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ executor);
}
-
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -205,7 +204,7 @@
this.name = name;
this.filter = filter;
-
+
this.pageSubscription = pageSubscription;
this.durable = durable;
@@ -230,7 +229,7 @@
{
expiryAddress = null;
}
-
+
if (pageSubscription != null)
{
pageSubscription.setQueue(this);
@@ -294,7 +293,7 @@
{
return name;
}
-
+
public SimpleString getAddress()
{
return address;
@@ -309,7 +308,7 @@
{
return pageSubscription;
}
-
+
public Filter getFilter()
{
return filter;
@@ -328,7 +327,6 @@
directDeliver = false;
}
-
public synchronized void reload(final MessageReference ref)
{
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -364,7 +362,11 @@
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
if (checkDirect)
{
- if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging())
+ if (direct && !directDeliver &&
+ concurrentQueue.isEmpty() &&
+ messageReferences.isEmpty() &&
+ !pageIterator.hasNext() &&
+ !pageSubscription.isPaging())
{
// We must block on the executor to ensure any async deliveries have completed or we might get out of order
// deliveries
@@ -380,7 +382,7 @@
{
return;
}
-
+
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
concurrentQueue.add(ref);
@@ -614,56 +616,72 @@
return false;
}
- public Iterator<MessageReference> iterator()
+ public LinkedListIterator<MessageReference> iterator()
{
return new SynchronizedIterator(messageReferences.iterator());
}
public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
{
- Iterator<MessageReference> iterator = iterator();
+ LinkedListIterator<MessageReference> iterator = iterator();
- MessageReference removed = null;
-
- while (iterator.hasNext())
+ try
{
- MessageReference ref = iterator.next();
- if (ref.getMessage().getMessageID() == id)
+ MessageReference removed = null;
+
+ while (iterator.hasNext())
{
- iterator.remove();
- refRemoved(ref);
+ MessageReference ref = iterator.next();
- removed = ref;
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+ refRemoved(ref);
- break;
+ removed = ref;
+
+ break;
+ }
}
+
+ if (removed == null)
+ {
+ // Look in scheduled deliveries
+ removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+ }
+
+ return removed;
}
-
- if (removed == null)
+ finally
{
- // Look in scheduled deliveries
- removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+ iterator.close();
}
-
- return removed;
}
public synchronized MessageReference getReference(final long id)
{
- Iterator<MessageReference> iterator = iterator();
+ LinkedListIterator<MessageReference> iterator = iterator();
- while (iterator.hasNext())
+ try
{
- MessageReference ref = iterator.next();
- if (ref.getMessage().getMessageID() == id)
+ while (iterator.hasNext())
{
- return ref;
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ return ref;
+ }
}
+
+ return null;
}
-
- return null;
+ finally
+ {
+ iterator.close();
+ }
}
public long getMessageCount()
@@ -674,8 +692,11 @@
{
if (pageSubscription != null)
{
- // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
- return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+ // messageReferences will have depaged messages which we need to discount from the counter as they are
+ // counted on the pageSubscription as well
+ return messageReferences.size() + getScheduledCount() +
+ deliveringCount.get() +
+ pageSubscription.getMessageCount();
}
else
{
@@ -709,9 +730,9 @@
else
{
ServerMessage message = ref.getMessage();
-
+
boolean durableRef = message.isDurable() && durable;
-
+
if (durableRef)
{
storageManager.storeAcknowledge(id, message.getMessageID());
@@ -726,22 +747,22 @@
if (ref.isPaged())
{
pageSubscription.ackTx(tx, (PagedReference)ref);
-
+
getRefsOperation(tx).addAck(ref);
}
else
{
ServerMessage message = ref.getMessage();
-
+
boolean durableRef = message.isDurable() && durable;
-
+
if (durableRef)
{
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
-
+
tx.setContainsPersistent();
}
-
+
getRefsOperation(tx).addAck(ref);
}
}
@@ -756,8 +777,8 @@
}
getRefsOperation(tx).addAck(ref);
-
- //https://issues.jboss.org/browse/HORNETQ-609
+
+ // https://issues.jboss.org/browse/HORNETQ-609
deliveringCount.incrementAndGet();
}
@@ -848,33 +869,40 @@
Transaction tx = new TransactionImpl(storageManager);
- Iterator<MessageReference> iter = iterator();
-
- while (iter.hasNext())
+ LinkedListIterator<MessageReference> iter = iterator();
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ while (iter.hasNext())
{
+ MessageReference ref = iter.next();
+
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ refRemoved(ref);
+ count++;
+ }
+ }
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+ for (MessageReference messageReference : cancelled)
+ {
deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- refRemoved(ref);
+ acknowledge(tx, messageReference);
count++;
}
+
+ tx.commit();
+
+ return count;
}
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
- for (MessageReference messageReference : cancelled)
+ finally
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
+ iter.close();
}
-
- tx.commit();
-
- return count;
}
public synchronized boolean deleteReference(final long messageID) throws Exception
@@ -883,44 +911,58 @@
Transaction tx = new TransactionImpl(storageManager);
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
+ try
+ {
- while (iter.hasNext())
- {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- refRemoved(ref);
- deleted = true;
- break;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ refRemoved(ref);
+ deleted = true;
+ break;
+ }
}
- }
- tx.commit();
+ tx.commit();
- return deleted;
+ return deleted;
+ }
+ finally
+ {
+ iter.close();
+ }
}
public synchronized boolean expireReference(final long messageID) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
+ try
+ {
- while (iter.hasNext())
- {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
- return true;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ return true;
+ }
}
+ return false;
}
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized int expireReferences(final Filter filter) throws Exception
@@ -928,112 +970,150 @@
Transaction tx = new TransactionImpl(storageManager);
int count = 0;
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- expire(tx, ref);
- iter.remove();
- refRemoved(ref);
- count++;
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ expire(tx, ref);
+ iter.remove();
+ refRemoved(ref);
+ count++;
+ }
}
- }
- tx.commit();
+ tx.commit();
- return count;
+ return count;
+ }
+ finally
+ {
+ iter.close();
+ }
}
public synchronized void expireReferences() throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().isExpired())
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
+ MessageReference ref = iter.next();
+ if (ref.getMessage().isExpired())
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ }
}
}
+ finally
+ {
+ iter.close();
+ }
}
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- sendToDeadLetterAddress(ref);
- iter.remove();
- refRemoved(ref);
- return true;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ sendToDeadLetterAddress(ref);
+ iter.remove();
+ refRemoved(ref);
+ return true;
+ }
}
+ return false;
}
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
{
int count = 0;
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- sendToDeadLetterAddress(ref);
- iter.remove();
- refRemoved(ref);
- count++;
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ sendToDeadLetterAddress(ref);
+ iter.remove();
+ refRemoved(ref);
+ count++;
+ }
}
+ return count;
}
- return count;
+ finally
+ {
+ iter.close();
+ }
}
public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
return moveReference(messageID, toAddress, false);
}
-
- public synchronized boolean moveReference(final long messageID, final SimpleString toAddress, final boolean rejectDuplicate) throws Exception
+
+ public synchronized boolean moveReference(final long messageID,
+ final SimpleString toAddress,
+ final boolean rejectDuplicate) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- iter.remove();
- refRemoved(ref);
- deliveringCount.incrementAndGet();
- try
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
{
- move(toAddress, ref);
+ iter.remove();
+ refRemoved(ref);
+ deliveringCount.incrementAndGet();
+ try
+ {
+ move(toAddress, ref);
+ }
+ catch (Exception e)
+ {
+ deliveringCount.decrementAndGet();
+ throw e;
+ }
+ return true;
}
- catch (Exception e)
- {
- deliveringCount.decrementAndGet();
- throw e;
- }
- return true;
}
+ return false;
}
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
@@ -1041,7 +1121,9 @@
return moveReferences(filter, toAddress, false);
}
- public synchronized int moveReferences(final Filter filter, final SimpleString toAddress, final boolean rejectDuplicates) throws Exception
+ public synchronized int moveReferences(final Filter filter,
+ final SimpleString toAddress,
+ final boolean rejectDuplicates) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -1049,64 +1131,83 @@
try
{
- Iterator<MessageReference> iter = iterator();
-
- DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
-
- while (iter.hasNext())
+ LinkedListIterator<MessageReference> iter = iterator();
+
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+
+ DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
+
+ while (iter.hasNext())
{
- boolean ignored = false;
-
- deliveringCount.incrementAndGet();
- count++;
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ boolean ignored = false;
- if (rejectDuplicates)
- {
- byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
- if (duplicateBytes != null)
+ deliveringCount.incrementAndGet();
+ count++;
+
+ if (rejectDuplicates)
{
- if (targetDuplicateCache.contains(duplicateBytes))
+ byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
{
- log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
- acknowledge(tx, ref);
- ignored = true;
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() +
+ " was already set at " +
+ toAddress +
+ ". Move from " +
+ this.address +
+ " being ignored and message removed from " +
+ this.address);
+ acknowledge(tx, ref);
+ ignored = true;
+ }
}
}
+
+ if (!ignored)
+ {
+ move(toAddress, tx, ref, false, rejectDuplicates);
+ }
+ iter.remove();
}
-
- if (!ignored)
- {
- move(toAddress, tx, ref, false, rejectDuplicates);
- }
- iter.remove();
}
- }
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
- for (MessageReference ref : cancelled)
- {
- byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
- if (duplicateBytes != null)
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+ for (MessageReference ref : cancelled)
{
- if (targetDuplicateCache.contains(duplicateBytes))
+ byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
{
- log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored");
- continue;
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() +
+ " was already set at " +
+ toAddress +
+ ". Move from " +
+ this.address +
+ " being ignored");
+ continue;
+ }
}
+
+ deliveringCount.incrementAndGet();
+ count++;
+ move(toAddress, tx, ref, false, rejectDuplicates);
+ acknowledge(tx, ref);
}
-
- deliveringCount.incrementAndGet();
- count++;
- move(toAddress, tx, ref, false, rejectDuplicates);
- acknowledge(tx, ref);
+
+ tx.commit();
+
+ return count;
}
-
- tx.commit();
-
- return count;
+ finally
+ {
+ iter.close();
+ }
}
catch (Exception e)
{
@@ -1117,42 +1218,57 @@
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+
+ while (iter.hasNext())
{
- iter.remove();
- refRemoved(ref);
- ref.getMessage().setPriority(newPriority);
- addTail(ref, false);
- return true;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ iter.remove();
+ refRemoved(ref);
+ ref.getMessage().setPriority(newPriority);
+ addTail(ref, false);
+ return true;
+ }
}
+
+ return false;
}
-
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- int count = 0;
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ int count = 0;
+ while (iter.hasNext())
{
- count++;
- iter.remove();
- refRemoved(ref);
- ref.getMessage().setPriority(newPriority);
- addTail(ref, false);
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ count++;
+ iter.remove();
+ refRemoved(ref);
+ ref.getMessage().setPriority(newPriority);
+ addTail(ref, false);
+ }
}
+ return count;
}
- return count;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized void resetAllIterators()
@@ -1179,7 +1295,7 @@
{
return paused;
}
-
+
public boolean isDirectDeliver()
{
return directDeliver;
@@ -1225,7 +1341,6 @@
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
-
/**
* @param ref
*/
@@ -1236,7 +1351,6 @@
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
-
private synchronized void doPoll()
{
MessageReference ref = concurrentQueue.poll();
@@ -1315,7 +1429,7 @@
if (checkExpired(ref))
{
holder.iter.remove();
-
+
refRemoved(ref);
continue;
@@ -1342,7 +1456,7 @@
if (status == HandleStatus.HANDLED)
{
holder.iter.remove();
-
+
refRemoved(ref);
if (groupID != null && groupConsumer == null)
@@ -1382,14 +1496,13 @@
pos = 0;
}
}
-
+
if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
{
scheduleDepage();
}
}
-
/**
* @param ref
*/
@@ -1401,7 +1514,7 @@
pagedReferences.decrementAndGet();
}
}
-
+
/**
* @param ref
*/
@@ -1413,31 +1526,29 @@
}
}
-
private void scheduleDepage()
{
executor.execute(depageRunner);
}
-
+
private void depage()
{
if (paused || pageIterator == null || consumerList.isEmpty())
{
return;
}
-
+
long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
-
- //System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+
+ // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
PagedReference reference = pageIterator.next();
addTail(reference, false);
pageIterator.remove();
}
- //System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
-
-
+ // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+
deliverAsync();
}
@@ -1496,7 +1607,7 @@
return true;
}
}
-
+
/** Used on testing only **/
public int getNumberOfReferences()
{
@@ -1746,7 +1857,7 @@
QueueImpl queue = (QueueImpl)ref.getQueue();
queue.deliveringCount.decrementAndGet();
-
+
if (queue.deliveringCount.get() < 0)
{
new Exception("DeliveringCount became negative").printStackTrace();
@@ -1839,6 +1950,7 @@
private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+
List<ServerMessage> pagedMessagesToPostACK = null;
synchronized void addAck(final MessageReference ref)
@@ -1912,7 +2024,7 @@
postAcknowledge(ref);
}
}
-
+
if (pagedMessagesToPostACK != null)
{
for (ServerMessage msg : pagedMessagesToPostACK)
@@ -1936,8 +2048,9 @@
public void beforeRollback(final Transaction tx) throws Exception
{
}
-
- public List<MessageReference> getRelatedMessageReferences() {
+
+ public List<MessageReference> getRelatedMessageReferences()
+ {
return refsToAck;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -45,6 +45,7 @@
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.TypedProperties;
/**
@@ -100,7 +101,7 @@
*/
private final boolean browseOnly;
- private Runnable browserDeliverer;
+ private BrowserDeliverer browserDeliverer;
private final boolean strictUpdateDeliveryCount;
@@ -313,8 +314,12 @@
largeMessageDeliverer.finish();
}
- if (!browseOnly)
+ if (browseOnly)
{
+ browserDeliverer.close();
+ }
+ else
+ {
messageQueue.removeConsumer(this);
}
@@ -901,12 +906,17 @@
{
private MessageReference current = null;
- public BrowserDeliverer(final Iterator<MessageReference> iterator)
+ public BrowserDeliverer(final LinkedListIterator<MessageReference> iterator)
{
this.iterator = iterator;
}
- private final Iterator<MessageReference> iterator;
+ private final LinkedListIterator<MessageReference> iterator;
+
+ public synchronized void close()
+ {
+ iterator.close();
+ }
public synchronized void run()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -155,11 +155,18 @@
private LinkedListIterator<T> lastIter;
private int resetCount = lastReset;
+
+ volatile boolean closed = false;
PriorityLinkedListIterator()
{
index = levels.length - 1;
}
+
+ protected void finalize()
+ {
+ close();
+ }
public void repeat()
{
@@ -173,13 +180,17 @@
public void close()
{
- lastIter = null;
-
- for (LinkedListIterator<T> iter : cachedIters)
+ if (!closed)
{
- if (iter != null)
+ closed = true;
+ lastIter = null;
+
+ for (LinkedListIterator<T> iter : cachedIters)
{
- iter.close();
+ if (iter != null)
+ {
+ iter.close();
+ }
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.postoffice.impl;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -27,6 +26,7 @@
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
/**
* A FakeQueue
@@ -431,7 +431,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#iterator()
*/
- public Iterator<MessageReference> iterator()
+ public LinkedListIterator<MessageReference> iterator()
{
// TODO Auto-generated method stub
return null;
13 years, 11 months
JBoss hornetq SVN: r10187 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-08 13:18:54 -0500 (Tue, 08 Feb 2011)
New Revision: 10187
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-02-08 18:08:54 UTC (rev 10186)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-02-08 18:18:54 UTC (rev 10187)
@@ -122,7 +122,7 @@
super.tearDown();
}
-
+
public void testOrder1() throws Throwable
{
boolean persistentMessages = true;
@@ -209,18 +209,6 @@
}
}
- session.commit();
-
- for (ServerSession sessionServer : server.getSessions())
- {
- sessionServer.close(true);
- }
-
- OperationContextImpl.getContext().waitCompletion();
-
- assertEquals(numberOfMessages - 100, queue.getMessageCount());
- assertEquals(numberOfMessages, queue.getMessagesAdded());
-
session.close();
session = null;
13 years, 11 months
JBoss hornetq SVN: r10186 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-08 13:08:54 -0500 (Tue, 08 Feb 2011)
New Revision: 10186
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
Fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-02-07 22:10:53 UTC (rev 10185)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-02-08 18:08:54 UTC (rev 10186)
@@ -122,7 +122,7 @@
super.tearDown();
}
-
+
public void testOrder1() throws Throwable
{
boolean persistentMessages = true;
@@ -216,18 +216,20 @@
sessionServer.close(true);
}
-
OperationContextImpl.getContext().waitCompletion();
assertEquals(numberOfMessages - 100, queue.getMessageCount());
assertEquals(numberOfMessages, queue.getMessagesAdded());
+
+ session.close();
+
+ session = null;
- OperationContextImpl.getContext().waitCompletion();
+ sf.close();
+ sf = locator.createSessionFactory();
- ((ClientSessionFactoryImpl)sf).stopPingingAfterOne();
+ locator = createInVMNonHALocator();
- sf = locator.createSessionFactory();
-
session = sf.createSession(true, true, 0);
session.start();
@@ -241,6 +243,10 @@
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
+
+ session.close();
+
+ sf.close();
}
catch (Throwable e)
13 years, 11 months
JBoss hornetq SVN: r10185 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-07 17:10:53 -0500 (Mon, 07 Feb 2011)
New Revision: 10185
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
Log:
Fixing tests on paging and transactions
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -29,6 +29,8 @@
boolean isCommit();
boolean isRollback();
+
+ void setCommitted(boolean committed);
void commit();
@@ -44,6 +46,8 @@
void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
+ void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception;
+
void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws Exception;
// To be used after the update was stored or reload
@@ -63,4 +67,5 @@
* @return true if the message will be delivered later, false if it should be delivered right away
*/
boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -87,9 +87,12 @@
/**
* To be called when the cursor decided to ignore a position.
+ *
* @param position
*/
void positionIgnored(PagePosition position);
+
+ void lateDeliveryRollback(PagePosition position);
/**
* To be used to avoid a redelivery of a prepared ACK after load
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -73,7 +73,7 @@
private static void trace(final String message)
{
- PageSubscriptionImpl.log.trace(message);
+ PageSubscriptionImpl.log.trace(message);
}
private volatile boolean autoCleanup = true;
@@ -99,9 +99,9 @@
private List<PagePosition> recoveredACK;
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
-
+
private final PageSubscriptionCounter counter;
-
+
private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
@@ -178,8 +178,6 @@
confirmPosition(position);
}
-
-
public long getMessageCount()
{
return counter.getValue() - deliveredCount.get();
@@ -189,7 +187,7 @@
{
return counter;
}
-
+
public void scheduleCleanupCheck()
{
if (autoCleanup)
@@ -278,7 +276,7 @@
synchronized (PageSubscriptionImpl.this)
{
for (PageCursorInfo completePage : completedPages)
- {
+ {
if (isTrace)
{
PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
@@ -423,7 +421,7 @@
public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
{
confirmPosition(tx, reference.getPosition());
-
+
counter.increment(tx, -1);
PageTransactionInfo txInfo = getPageTransaction(reference);
@@ -480,7 +478,7 @@
return consumedPages.firstKey();
}
}
-
+
public void addPendingDelivery(final PagePosition position)
{
getPageInfo(position).incrementPendingTX();
@@ -551,6 +549,13 @@
processACK(position);
}
+
+ public void lateDeliveryRollback(PagePosition position)
+ {
+ PageCursorInfo cursorInfo = processACK(position);
+ cursorInfo.decrementPendingTX();
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
*/
@@ -753,7 +758,7 @@
// To be called only after the ACK has been processed and guaranteed to be on storae
// The only exception is on non storage events such as not matching messages
- private void processACK(final PagePosition pos)
+ private PageCursorInfo processACK(final PagePosition pos)
{
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
{
@@ -770,6 +775,8 @@
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
+
+ return info;
}
/**
@@ -783,7 +790,7 @@
// It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent();
}
-
+
getPageInfo(position).remove(position);
PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
@@ -846,7 +853,7 @@
// The page was live at the time of the creation
private final boolean wasLive;
-
+
// There's a pending TX to add elements on this page
private AtomicInteger pendingTX = new AtomicInteger(0);
@@ -902,12 +909,12 @@
{
return pageId;
}
-
+
public void incrementPendingTX()
{
pendingTX.incrementAndGet();
}
-
+
public void decrementPendingTX()
{
pendingTX.decrementAndGet();
@@ -932,12 +939,13 @@
if (isTrace)
{
PageSubscriptionImpl.trace("numberOfMessages = " + getNumberOfMessages() +
- " confirmed = " +
- (confirmed.get() + 1) +
- ", page = " +
- pageId);
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ " pendingTX = " + pendingTX +
+ ", page = " +
+ pageId);
}
-
+
// Negative could mean a bookmark on the first element for the page (example -1)
if (posACK.getMessageNr() >= 0)
{
@@ -1018,7 +1026,7 @@
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
*/
@@ -1026,7 +1034,6 @@
{
return Collections.emptyList();
}
-
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -172,6 +172,25 @@
*/
public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
{
+ internalUpdatePageManager(storageManager, pagingManager, tx, 1);
+ }
+
+ public void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception
+ {
+ UpdatePageTXOperation updt = internalUpdatePageManager(storageManager, pagingManager, tx, increment);
+ updt.setStored();
+ }
+
+ /**
+ * @param storageManager
+ * @param pagingManager
+ * @param tx
+ */
+ protected UpdatePageTXOperation internalUpdatePageManager(final StorageManager storageManager,
+ final PagingManager pagingManager,
+ final Transaction tx,
+ final int increment)
+ {
UpdatePageTXOperation pgtxUpdate = (UpdatePageTXOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE);
if (pgtxUpdate == null)
@@ -183,7 +202,9 @@
tx.setContainsPersistent();
- pgtxUpdate.addUpdate(this);
+ pgtxUpdate.addUpdate(this, increment);
+
+ return pgtxUpdate;
}
public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager) throws Exception
@@ -213,6 +234,11 @@
{
return committed;
}
+
+ public void setCommitted(final boolean committed)
+ {
+ this.committed = committed;
+ }
public boolean isRollback()
{
@@ -228,8 +254,9 @@
{
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
{
- pos.a.positionIgnored(pos.b);
+ pos.a.lateDeliveryRollback(pos.b);
}
+ lateDeliveries = null;
}
}
@@ -303,8 +330,13 @@
this.pagingManager = pagingManager;
}
- public void addUpdate(PageTransactionInfo info)
+ public void setStored()
{
+ stored = true;
+ }
+
+ public void addUpdate(final PageTransactionInfo info, final int increment)
+ {
AtomicInteger counter = countsToUpdate.get(info);
if (counter == null)
@@ -313,7 +345,7 @@
countsToUpdate.put(info, counter);
}
- counter.incrementAndGet();
+ counter.addAndGet(increment);
}
public void beforePrepare(Transaction tx) throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -1664,16 +1664,27 @@
}
case PAGE_TRANSACTION:
{
+
PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+ if (record.isUpdate)
+ {
+ PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
+ pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
+ }
+ else
+ {
+ pageTransactionInfo.setCommitted(false);
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+
+ tx.addOperation(new FinishPageMessageOperation());
+ }
- pagingManager.addTransaction(pageTransactionInfo);
-
- tx.addOperation(new FinishPageMessageOperation());
-
break;
}
case SET_SCHEDULED_DELIVERY_TIME:
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -309,8 +309,6 @@
doRollback();
- state = State.ROLLEDBACK;
-
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
@@ -327,6 +325,7 @@
public void done()
{
afterRollback();
+ state = State.ROLLEDBACK;
}
});
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -154,20 +154,20 @@
return (Iterator[])Array.newInstance(Iterator.class, size);
}
- private void removeAfter(Node<E> after)
+ private void removeAfter(Node<E> node)
{
- Node<E> toRemove = after.next;
+ Node<E> toRemove = node.next;
- after.next = toRemove.next;
+ node.next = toRemove.next;
if (toRemove.next != null)
{
- toRemove.next.prev = after;
+ toRemove.next.prev = node;
}
if (toRemove == tail)
{
- tail = after;
+ tail = node;
}
size--;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -1209,8 +1209,8 @@
clientSession.rollback(xid);
clientSession.start();
m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
m.acknowledge();
- Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
13 years, 11 months
JBoss hornetq SVN: r10184 - branches/Branch_2_2_EAP/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-04 20:25:35 -0500 (Fri, 04 Feb 2011)
New Revision: 10184
Modified:
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
upload release
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-02-05 01:21:04 UTC (rev 10183)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-02-05 01:25:35 UTC (rev 10184)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=QA_10162
+hornetq.version.versionName=QA_10182
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=0
13 years, 11 months
JBoss hornetq SVN: r10183 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-04 20:21:04 -0500 (Fri, 04 Feb 2011)
New Revision: 10183
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
Log:
upload release
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-02-04 23:47:26 UTC (rev 10182)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-02-05 01:21:04 UTC (rev 10183)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.EAP-QA-10169"/>
+ <property name="hornetq.version" value="2.2.0.EAP-QA-10183"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-02-04 23:47:26 UTC (rev 10182)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-02-05 01:21:04 UTC (rev 10183)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.EAP-QA-10169</hornetq.version>
+ <hornetq.version>2.2.0.EAP-QA-10183</hornetq.version>
</properties>
<licenses>
13 years, 11 months
JBoss hornetq SVN: r10182 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-04 18:47:26 -0500 (Fri, 04 Feb 2011)
New Revision: 10182
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
oops
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-04 23:25:01 UTC (rev 10181)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-04 23:47:26 UTC (rev 10182)
@@ -112,17 +112,6 @@
super.tearDown();
}
- public void testRepeat() throws Exception
- {
- for (int i = 0; i < 100; i++)
- {
- System.out.println(" ####################### test " + i);
- testPreparePersistent();
- tearDown();
- setUp();
- }
- }
-
public void testPreparePersistent() throws Exception
{
clearData();
13 years, 11 months
JBoss hornetq SVN: r10181 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-04 18:25:01 -0500 (Fri, 04 Feb 2011)
New Revision: 10181
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Improvements on non-blocking paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.LinkedListIterator;
@@ -30,6 +31,8 @@
// Cursor query operations --------------------------------------
+ PagingStore getPagingStore();
+
// To be called before the server is down
void stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -49,7 +49,7 @@
public synchronized PagedMessage getPagedMessage()
{
- PagedMessage returnMessage = message.get();
+ PagedMessage returnMessage = message != null ? message.get() : null;
// We only keep a few references on the Queue from paging...
// Besides those references are SoftReferenced on page cache...
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -69,11 +69,11 @@
// Attributes ----------------------------------------------------
- private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
+ private final boolean isTrace = PageSubscriptionImpl.log.isTraceEnabled();
private static void trace(final String message)
{
- PageSubscriptionImpl.log.info(message);
+ PageSubscriptionImpl.log.trace(message);
}
private volatile boolean autoCleanup = true;
@@ -131,6 +131,11 @@
// Public --------------------------------------------------------
+ public PagingStore getPagingStore()
+ {
+ return pageStore;
+ }
+
public Queue getQueue()
{
return queue;
@@ -534,6 +539,7 @@
*/
public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
+ deliveredCount.incrementAndGet();
installTXCallback(tx, position);
}
@@ -777,6 +783,8 @@
// It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent();
}
+
+ getPageInfo(position).remove(position);
PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
@@ -903,6 +911,7 @@
public void decrementPendingTX()
{
pendingTX.decrementAndGet();
+ checkDone();
}
public boolean isRemoved(final PagePosition pos)
@@ -928,17 +937,26 @@
", page = " +
pageId);
}
-
+
// Negative could mean a bookmark on the first element for the page (example -1)
if (posACK.getMessageNr() >= 0)
{
- if (getNumberOfMessages() == confirmed.incrementAndGet())
- {
- onPageDone(this);
- }
+ confirmed.incrementAndGet();
+ checkDone();
}
}
+ /**
+ *
+ */
+ protected void checkDone()
+ {
+ if (isDone())
+ {
+ onPageDone(this);
+ }
+ }
+
private int getNumberOfMessages()
{
if (wasLive)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -56,6 +56,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
@@ -1715,6 +1716,7 @@
if (sub != null)
{
sub.reloadPreparedACK(tx, encoding.position);
+ referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub));
}
else
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -32,6 +32,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
@@ -103,6 +104,9 @@
// The quantity of pagedReferences on messageREferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
+
+ // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
+ private final AtomicInteger queueMemorySize = new AtomicInteger(0);
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
@@ -327,6 +331,7 @@
public synchronized void reload(final MessageReference ref)
{
+ queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
if (!scheduledDeliveryHandler.checkAndSchedule(ref))
{
internalAddTail(ref);
@@ -375,6 +380,8 @@
{
return;
}
+
+ queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
concurrentQueue.add(ref);
@@ -1214,22 +1221,18 @@
*/
private void internalAddTail(final MessageReference ref)
{
- if (ref.isPaged())
- {
- pagedReferences.incrementAndGet();
- }
+ refAdded(ref);
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
+
/**
* @param ref
*/
private void internalAddHead(final MessageReference ref)
{
- if (ref.isPaged())
- {
- pagedReferences.incrementAndGet();
- }
+ queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
@@ -1392,12 +1395,25 @@
*/
private void refRemoved(MessageReference ref)
{
+ queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
if (ref.isPaged())
{
pagedReferences.decrementAndGet();
}
}
+ /**
+ * @param ref
+ */
+ protected void refAdded(final MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.incrementAndGet();
+ }
+ }
+
+
private void scheduleDepage()
{
executor.execute(depageRunner);
@@ -1405,33 +1421,23 @@
private void depage()
{
- if (paused || consumerList.isEmpty())
+ if (paused || pageIterator == null || consumerList.isEmpty())
{
return;
}
- int msgsToDeliver = MAX_DELIVERIES_IN_LOOP - (messageReferences.size() + getScheduledCount() + concurrentQueue.size());
+ long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
- if (msgsToDeliver > 0)
+ //System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+ while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
- //System.out.println("Depaging " + msgsToDeliver + " messages");
- //System.out.println("Depage " + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-
- int nmessages = 0;
- while (nmessages < msgsToDeliver && pageIterator.hasNext())
- {
- nmessages ++;
- addTail(pageIterator.next(), false);
- pageIterator.remove();
- }
-
- //System.out.println("Depaged " + nmessages);
+ PagedReference reference = pageIterator.next();
+ addTail(reference, false);
+ pageIterator.remove();
}
-// else
-// {
-// System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-// }
+ //System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+
deliverAsync();
}
@@ -1737,13 +1743,26 @@
private void postAcknowledge(final MessageReference ref)
{
+ QueueImpl queue = (QueueImpl)ref.getQueue();
+
+ queue.deliveringCount.decrementAndGet();
+
+ if (queue.deliveringCount.get() < 0)
+ {
+ new Exception("DeliveringCount became negative").printStackTrace();
+ }
+
+ if (ref.isPaged())
+ {
+ // nothing to be done
+ return;
+ }
+
final ServerMessage message = ref.getMessage();
- QueueImpl queue = (QueueImpl)ref.getQueue();
-
boolean durableRef = message.isDurable() && queue.durable;
- if (durableRef && ! ref.isPaged())
+ if (durableRef)
{
int count = message.decrementDurableRefCount();
@@ -1775,13 +1794,6 @@
}
}
- queue.deliveringCount.decrementAndGet();
-
- if (queue.deliveringCount.get() < 0)
- {
- new Exception("DeliveringCount became negative").printStackTrace();
- }
-
try
{
message.decrementRefCount();
@@ -1827,10 +1839,19 @@
private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+ List<ServerMessage> pagedMessagesToPostACK = null;
synchronized void addAck(final MessageReference ref)
{
refsToAck.add(ref);
+ if (ref.isPaged())
+ {
+ if (pagedMessagesToPostACK == null)
+ {
+ pagedMessagesToPostACK = new ArrayList<ServerMessage>();
+ }
+ pagedMessagesToPostACK.add(ref.getMessage());
+ }
}
public void beforeCommit(final Transaction tx) throws Exception
@@ -1891,6 +1912,21 @@
postAcknowledge(ref);
}
}
+
+ if (pagedMessagesToPostACK != null)
+ {
+ for (ServerMessage msg : pagedMessagesToPostACK)
+ {
+ try
+ {
+ msg.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
}
public void beforePrepare(final Transaction tx) throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-04 23:25:01 UTC (rev 10181)
@@ -112,11 +112,19 @@
super.tearDown();
}
+ public void testRepeat() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println(" ####################### test " + i);
+ testPreparePersistent();
+ tearDown();
+ setUp();
+ }
+ }
+
public void testPreparePersistent() throws Exception
{
- boolean persistentMessages = true;
-
- System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
@@ -133,8 +141,12 @@
final int messageSize = 1024;
- final int numberOfMessages = 10000;
+ final int numberOfMessages = 5000;
+ final int numberOfTX = 10;
+
+ final int messagesPerTX = numberOfMessages / numberOfTX;
+
try
{
ServerLocator locator = createInVMNonHALocator();
@@ -164,7 +176,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
- message = session.createMessage(persistentMessages);
+ message = session.createMessage(true);
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -204,7 +216,7 @@
LinkedList<Xid> xids = new LinkedList<Xid>();
int msgReceived = 0;
- for (int i = 0; i < numberOfMessages / 999; i++)
+ for (int i = 0; i < numberOfTX; i++)
{
ClientSession sessionConsumer = sf.createSession(true, false, false);
Xid xid = newXID();
@@ -212,14 +224,14 @@
sessionConsumer.start(xid, XAResource.TMNOFLAGS);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
- for (int msgCount = 0; msgCount < 1000; i++)
+ for (int msgCount = 0; msgCount < messagesPerTX; msgCount++)
{
if (msgReceived == numberOfMessages)
{
break;
}
- System.out.println("MsgReceived = " + (msgReceived++));
- ClientMessage msg = consumer.receive(5000);
+ msgReceived++;
+ ClientMessage msg = consumer.receive(10000);
assertNotNull(msg);
msg.acknowledge();
}
@@ -236,8 +248,6 @@
sessionCheck.close();
- System.out.println(queue.getMessagesAdded());
-
assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
@@ -262,25 +272,75 @@
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
+
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
- assertNull(consumer.receiveImmediate());
+ ClientMessage msg = consumer.receive(5000);
+ if (msg != null)
+ {
+ System.out.println("Msg " + msg.getIntProperty("id"));
- for (Xid xid : xids)
+ while (true)
+ {
+ ClientMessage msg2 = consumer.receive(1000);
+ if (msg2 == null)
+ {
+ break;
+ }
+ System.out.println("Msg received again : " + msg2.getIntProperty("id"));
+
+ }
+ }
+ assertNull(msg);
+
+ for (int i = xids.size() -1 ; i >= 0; i--)
{
+ Xid xid = xids.get(i);
session.rollback(xid);
}
+ System.out.println("msgCount = " + queue.getMessageCount());
xids.clear();
- assertNotNull(consumer.receiveImmediate());
+ session.close();
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ assertEquals(i, msg.getIntProperty("id").intValue());
+
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
session.close();
sf.close();
locator.close();
- queue.getMessageCount();
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
// assertEquals(numberOfMessages, queue.getMessageCount());
}
finally
@@ -331,7 +391,10 @@
ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-invalid"), new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER), true);
+ session.createQueue(PagingTest.ADDRESS,
+ PagingTest.ADDRESS.concat("-invalid"),
+ new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER),
+ true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -374,16 +437,16 @@
session.commit();
}
}
-
+
session.commit();
-
+
session.commit();
session.commit();
-
+
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
store.getCursorProvier().cleanup();
-
+
long timeout = System.currentTimeMillis() + 5000;
while (store.isPaging() && timeout > System.currentTimeMillis())
{
@@ -402,7 +465,7 @@
try
{
server.stop();
- // System.exit(-1);
+ // System.exit(-1);
}
catch (Throwable ignored)
{
@@ -1792,6 +1855,10 @@
for (int i = 0; i < numberOfMessages; i++)
{
System.out.println("Received " + i);
+ if (i == 55)
+ {
+ System.out.println("i = 55");
+ }
ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
13 years, 11 months
JBoss hornetq SVN: r10180 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-04 16:52:52 -0500 (Fri, 04 Feb 2011)
New Revision: 10180
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
Log:
Fixing compilation
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java 2011-02-04 15:03:29 UTC (rev 10179)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java 2011-02-04 21:52:52 UTC (rev 10180)
@@ -109,7 +109,6 @@
qResourceAdapter.setDiscoveryPort(i);
qResourceAdapter.setDiscoveryRefreshTimeout(l);
qResourceAdapter.setDupsOKBatchSize(i);
- qResourceAdapter.setFailoverOnServerShutdown(b);
qResourceAdapter.setMinLargeMessageSize(i);
qResourceAdapter.setPassword(testpass);
qResourceAdapter.setPreAcknowledge(b);
@@ -145,7 +144,6 @@
assertEquals(qResourceAdapter.getDiscoveryPort(), i);
assertEquals(qResourceAdapter.getDiscoveryRefreshTimeout(), l);
assertEquals(qResourceAdapter.getDupsOKBatchSize(), i);
- assertEquals(qResourceAdapter.getFailoverOnServerShutdown(), b);
assertEquals(qResourceAdapter.getMinLargeMessageSize(), i);
assertEquals(qResourceAdapter.getPassword(), testpass);
assertEquals(qResourceAdapter.getPreAcknowledge(), b);
13 years, 11 months