[hornetq-commits] JBoss hornetq SVN: r8331 - in branches/ClebertTemporary: src/main/org/hornetq/core/persistence and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 19 22:01:52 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-19 22:01:51 -0500 (Thu, 19 Nov 2009)
New Revision: 8331

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Fixing tests

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-11-20 01:51:27 UTC (rev 8330)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-11-20 03:01:51 UTC (rev 8331)
@@ -438,7 +438,7 @@
          {
             Transaction transaction = resourceManager.removeTransaction(xid);
             transaction.commit();
-            server.getStorageManager().completeOperations();
+            server.getStorageManager().waitOnOperations(-1);
             long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
             resourceManager.putHeuristicCompletion(recordID, xid, true);
             return true;

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-20 01:51:27 UTC (rev 8330)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-20 03:01:51 UTC (rev 8331)
@@ -59,10 +59,14 @@
 
    void afterCompleteOperations(IOAsyncTask run);
    
-   /** Block until the replication is done. 
+   /** Block until the operations are done. 
     * @throws Exception */
    void waitOnOperations(long timeout) throws Exception;
 
+   /** Block until the operations are done. 
+    * @throws Exception */
+   void waitOnOperations() throws Exception;
+
    /** To close the OperationsContext */
    void completeOperations();
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-20 01:51:27 UTC (rev 8330)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-20 03:01:51 UTC (rev 8331)
@@ -309,6 +309,12 @@
       return replicator != null;
    }
 
+
+   public void waitOnOperations() throws Exception
+   {
+      waitOnOperations(-1);
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
     */
@@ -316,6 +322,12 @@
    {
       SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
       afterCompleteOperations(waitCallback);
+      completeOperations();
+      if (timeout <= 0)
+      {
+         waitCallback.waitCompletion();
+      }
+      else
       if (!waitCallback.waitCompletion(timeout))
       {
          throw new IllegalStateException("no response received from replication");

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-20 01:51:27 UTC (rev 8330)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-20 03:01:51 UTC (rev 8331)
@@ -349,4 +349,11 @@
       run.done();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+    */
+   public void waitOnOperations() throws Exception
+   {
+   }
+
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-20 01:51:27 UTC (rev 8330)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-20 03:01:51 UTC (rev 8331)
@@ -651,40 +651,46 @@
       return deleteMatchingReferences(null);
    }
 
-   public synchronized int deleteMatchingReferences(final Filter filter) throws Exception
+   public int deleteMatchingReferences(final Filter filter) throws Exception
    {
       int count = 0;
-
-      Transaction tx = new TransactionImpl(storageManager);
-
-      Iterator<MessageReference> iter = messageReferences.iterator();
-
-      while (iter.hasNext())
+      
+      synchronized(this)
       {
-         MessageReference ref = iter.next();
-
-         if (filter == null || filter.match(ref.getMessage()))
+   
+         Transaction tx = new TransactionImpl(storageManager);
+   
+         Iterator<MessageReference> iter = messageReferences.iterator();
+   
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            acknowledge(tx, ref);
-            iter.remove();
-            count++;
+            MessageReference ref = iter.next();
+   
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               deliveringCount.incrementAndGet();
+               acknowledge(tx, ref);
+               iter.remove();
+               count++;
+            }
          }
-      }
-
-      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
-      for (MessageReference messageReference : cancelled)
-      {
-         if (filter == null || filter.match(messageReference.getMessage()))
+   
+         List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+         for (MessageReference messageReference : cancelled)
          {
-            deliveringCount.incrementAndGet();
-            acknowledge(tx, messageReference);
-            count++;
+            if (filter == null || filter.match(messageReference.getMessage()))
+            {
+               deliveringCount.incrementAndGet();
+               acknowledge(tx, messageReference);
+               count++;
+            }
          }
+   
+         tx.commit();
       }
+      
+      storageManager.waitOnOperations(-1);
 
-      tx.commit();
-
       return count;
    }
 

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-20 01:51:27 UTC (rev 8330)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-20 03:01:51 UTC (rev 8331)
@@ -1238,10 +1238,15 @@
        */
       public void afterCompleteOperations(IOAsyncTask run)
       {
-         // TODO Auto-generated method stub
-         
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+       */
+      public void waitOnOperations() throws Exception
+      {
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list