Author: clebert.suconic(a)jboss.com
Date: 2009-11-19 22:01:51 -0500 (Thu, 19 Nov 2009)
New Revision: 8331
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Fixing tests
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-20
01:51:27 UTC (rev 8330)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-20
03:01:51 UTC (rev 8331)
@@ -438,7 +438,7 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.commit();
- server.getStorageManager().completeOperations();
+ server.getStorageManager().waitOnOperations(-1);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid,
true);
resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-20
01:51:27 UTC (rev 8330)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-20
03:01:51 UTC (rev 8331)
@@ -59,10 +59,14 @@
void afterCompleteOperations(IOAsyncTask run);
- /** Block until the replication is done.
+ /** Block until the operations are done.
* @throws Exception */
void waitOnOperations(long timeout) throws Exception;
+ /** Block until the operations are done.
+ * @throws Exception */
+ void waitOnOperations() throws Exception;
+
/** To close the OperationsContext */
void completeOperations();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-20
01:51:27 UTC (rev 8330)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-20
03:01:51 UTC (rev 8331)
@@ -309,6 +309,12 @@
return replicator != null;
}
+
+ public void waitOnOperations() throws Exception
+ {
+ waitOnOperations(-1);
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
*/
@@ -316,6 +322,12 @@
{
SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
afterCompleteOperations(waitCallback);
+ completeOperations();
+ if (timeout <= 0)
+ {
+ waitCallback.waitCompletion();
+ }
+ else
if (!waitCallback.waitCompletion(timeout))
{
throw new IllegalStateException("no response received from
replication");
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-20
01:51:27 UTC (rev 8330)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-20
03:01:51 UTC (rev 8331)
@@ -349,4 +349,11 @@
run.done();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+ */
+ public void waitOnOperations() throws Exception
+ {
+ }
+
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20
01:51:27 UTC (rev 8330)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20
03:01:51 UTC (rev 8331)
@@ -651,40 +651,46 @@
return deleteMatchingReferences(null);
}
- public synchronized int deleteMatchingReferences(final Filter filter) throws
Exception
+ public int deleteMatchingReferences(final Filter filter) throws Exception
{
int count = 0;
-
- Transaction tx = new TransactionImpl(storageManager);
-
- Iterator<MessageReference> iter = messageReferences.iterator();
-
- while (iter.hasNext())
+
+ synchronized(this)
{
- MessageReference ref = iter.next();
-
- if (filter == null || filter.match(ref.getMessage()))
+
+ Transaction tx = new TransactionImpl(storageManager);
+
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- count++;
+ MessageReference ref = iter.next();
+
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ count++;
+ }
}
- }
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
- for (MessageReference messageReference : cancelled)
- {
- if (filter == null || filter.match(messageReference.getMessage()))
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
+ if (filter == null || filter.match(messageReference.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
+ }
}
+
+ tx.commit();
}
+
+ storageManager.waitOnOperations(-1);
- tx.commit();
-
return count;
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-20
01:51:27 UTC (rev 8330)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-20
03:01:51 UTC (rev 8331)
@@ -1238,10 +1238,15 @@
*/
public void afterCompleteOperations(IOAsyncTask run)
{
- // TODO Auto-generated method stub
-
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+ */
+ public void waitOnOperations() throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory