[hornetq-commits] JBoss hornetq SVN: r8325 - in branches/ClebertTemporary: src/main/org/hornetq/core/paging and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 19 14:53:50 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-19 14:53:49 -0500 (Thu, 19 Nov 2009)
New Revision: 8325

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java
   branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
Removing recursive IO calls on the callbacks

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -1024,6 +1024,7 @@
       }
       catch (HornetQException e)
       {
+         log.warn(e.getMessage(), e);
          // This should never occur
          throw new XAException(XAException.XAER_RMERR);
       }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -67,14 +67,14 @@
     * @param message
     * @throws Exception
     */
-   void addSize(ServerMessage message, boolean add) throws Exception;
+   void addSize(ServerMessage message, boolean add);
    
    /**
     * 
     * @param reference
     * @throws Exception
     */
-   void addSize(MessageReference reference, boolean add) throws Exception;
+   void addSize(MessageReference reference, boolean add);
    
    /**
     * 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -281,7 +281,7 @@
       checkReleaseProducerFlowControlCredits(-credits);
    }
 
-   public void addSize(final ServerMessage message, final boolean add) throws Exception
+   public void addSize(final ServerMessage message, final boolean add)
    {
       long size = message.getMemoryEstimate();
 
@@ -299,7 +299,7 @@
       }
    }
 
-   public void addSize(final MessageReference reference, final boolean add) throws Exception
+   public void addSize(final MessageReference reference, final boolean add)
    {
       long size = MessageReferenceImpl.getMemoryEstimate();
 
@@ -479,7 +479,7 @@
       }
    }
 
-   public boolean startPaging() throws Exception
+   public boolean startPaging()
    {
       if (!running)
       {
@@ -510,7 +510,17 @@
       {
          if (currentPage == null)
          {
-            openNewPage();
+            try
+            {
+               openNewPage();
+            }
+            catch (Exception e)
+            {
+               // If not possible to starting page due to an IO error, we will just consider it non paging.
+               // This shouldn't happen anyway
+               log.warn("IO Error, impossible to start paging", e);
+               return false;
+            }
 
             return true;
          }
@@ -701,7 +711,7 @@
       }
    }
 
-   private void addSize(final long size) throws Exception
+   private void addSize(final long size) 
    {
       if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
       {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -176,7 +176,7 @@
       this.delayDeletionCount.incrementAndGet();
    }
 
-   public synchronized void decrementDelayDeletionCount() throws Exception
+   public synchronized void decrementDelayDeletionCount()
    {
       int count = this.delayDeletionCount.decrementAndGet();
 
@@ -191,7 +191,7 @@
       return new DecodingContext();
    }
 
-   private void checkDelete() throws Exception
+   private void checkDelete()
    {
       if (getRefCount() <= 0)
       {
@@ -220,7 +220,7 @@
    }
 
    @Override
-   public synchronized int decrementRefCount(MessageReference reference) throws Exception
+   public synchronized int decrementRefCount(MessageReference reference)
    {
       int currentRefCount = super.decrementRefCount(reference);
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -1865,11 +1865,11 @@
          }
       }
 
-      public void afterPrepare(final Transaction tx) throws Exception
+      public void afterPrepare(final Transaction tx)
       {
       }
 
-      public void afterRollback(final Transaction tx) throws Exception
+      public void afterRollback(final Transaction tx)
       {
          PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -138,7 +138,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.server.LargeServerMessage#decrementDelayDeletionCount()
     */
-   public void decrementDelayDeletionCount() throws Exception
+   public void decrementDelayDeletionCount()
    {
 
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -211,7 +211,7 @@
          this.recordID = recordID;
       }
 
-      private void process() throws Exception
+      private void process()
       {
          if (!done)
          {
@@ -246,17 +246,17 @@
       {
       }
 
-      public void afterCommit(final Transaction tx) throws Exception
+      public void afterCommit(final Transaction tx)
       {
          process();
       }
 
-      public void afterPrepare(final Transaction tx) throws Exception
+      public void afterPrepare(final Transaction tx)
       {
          process();
       }
 
-      public void afterRollback(final Transaction tx) throws Exception
+      public void afterRollback(final Transaction tx)
       {
       }
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -1114,11 +1114,11 @@
          }
       }
 
-      public void afterPrepare(final Transaction tx) throws Exception
+      public void afterPrepare(final Transaction tx)
       {
       }
 
-      public void afterRollback(final Transaction tx) throws Exception
+      public void afterRollback(final Transaction tx)
       {
          PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
 
@@ -1225,11 +1225,11 @@
          }
       }
 
-      public void afterPrepare(Transaction tx) throws Exception
+      public void afterPrepare(Transaction tx)
       {
       }
 
-      public void afterRollback(Transaction tx) throws Exception
+      public void afterRollback(Transaction tx)
       {
       }
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -42,5 +42,5 @@
    
    void incrementDelayDeletionCount();
    
-   void decrementDelayDeletionCount() throws Exception;
+   void decrementDelayDeletionCount();
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -125,6 +125,10 @@
    
    Collection<Consumer> getConsumers();
    
+   /** We can't execute IO operation when inside the IOCallback / TransactionCallback.
+    *  This method will will perform IO operations in a second thread */
+   boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception;
+   
    boolean checkDLQ(MessageReference ref) throws Exception;
    
    void lockDelivery();

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -33,7 +33,7 @@
 
    int incrementRefCount(MessageReference reference) throws Exception;
 
-   int decrementRefCount(MessageReference reference) throws Exception;
+   int decrementRefCount(MessageReference reference);
 
    int incrementDurableRefCount();
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -1025,7 +1025,7 @@
                                             configuration.getManagementClusterPassword(),
                                             managementService);
 
-      queueFactory = new QueueFactoryImpl(scheduledPool, addressSettingsRepository, storageManager);
+      queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager);
 
       pagingManager = createPagingManager();
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -14,6 +14,7 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.core.filter.Filter;
@@ -50,6 +51,7 @@
                          final Filter filter,
                          final boolean durable,
                          final boolean temporary,
+                         final Executor executor,
                          final ScheduledExecutorService scheduledExecutor,
                          final PostOffice postOffice,
                          final StorageManager storageManager,
@@ -61,6 +63,7 @@
             filter,
             durable,
             temporary,
+            executor,
             scheduledExecutor,
             postOffice,
             storageManager,

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -22,6 +22,7 @@
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -42,11 +43,16 @@
    private PostOffice postOffice;
 
    private final StorageManager storageManager;
+   
+   private final ExecutorFactory executorFactory;
 
-   public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
+   public QueueFactoryImpl(final ExecutorFactory executorFactory,
+                           final ScheduledExecutorService scheduledExecutor,
                            final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                            final StorageManager storageManager)
    {
+      this.executorFactory = executorFactory;
+      
       this.addressSettingsRepository = addressSettingsRepository;
 
       this.scheduledExecutor = scheduledExecutor;
@@ -77,6 +83,7 @@
                                     filter,
                                     durable,
                                     temporary,
+                                    executorFactory.getExecutor(),
                                     scheduledExecutor,
                                     postOffice,
                                     storageManager,
@@ -90,6 +97,7 @@
                                filter,
                                durable,
                                temporary,
+                               executorFactory.getExecutor(),
                                scheduledExecutor,
                                postOffice,
                                storageManager,

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -114,6 +114,9 @@
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
    private final ScheduledExecutorService scheduledExecutor;
+   
+   /** We can't perform any operation on the journal while inside the Transactional operations. */
+   private final Executor journalExecutor;
 
    private final SimpleString address;
 
@@ -139,6 +142,7 @@
                     final Filter filter,
                     final boolean durable,
                     final boolean temporary,
+                    final Executor executor,
                     final ScheduledExecutorService scheduledExecutor,
                     final PostOffice postOffice,
                     final StorageManager storageManager,
@@ -163,6 +167,8 @@
       this.addressSettingsRepository = addressSettingsRepository;
 
       this.scheduledExecutor = scheduledExecutor;
+      
+      this.journalExecutor = executor;
 
       direct = true;
 
@@ -921,11 +927,36 @@
 
    public boolean checkDLQ(final MessageReference reference) throws Exception
    {
+      return checkDLQ(reference, null);
+   }
+   
+   public boolean checkDLQ(final MessageReference reference, Executor ioExecutor) throws Exception
+   {
       ServerMessage message = reference.getMessage();
 
       if (message.isDurable() && durable)
       {
-         storageManager.updateDeliveryCount(reference);
+         if (ioExecutor != null)
+         {
+            ioExecutor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     storageManager.updateDeliveryCount(reference);
+                  }
+                  catch (Exception e)
+                  {
+                     log.warn("Can't update delivery count on checkDLQ", e);
+                  }
+               }
+            });
+         }
+         else
+         {
+            storageManager.updateDeliveryCount(reference);
+         }
       }
 
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@@ -934,7 +965,27 @@
 
       if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
       {
-         sendToDeadLetterAddress(reference);
+         if (ioExecutor != null)
+         {
+            ioExecutor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     sendToDeadLetterAddress(reference);
+                  }
+                  catch (Exception e)
+                  {
+                     log.warn("Error on DLQ send", e);
+                  }
+               }
+            });
+         }
+         else
+         {
+            sendToDeadLetterAddress(reference);
+         }
 
          return false;
       }
@@ -946,7 +997,27 @@
          {
             reference.setScheduledDeliveryTime(System.currentTimeMillis() + redeliveryDelay);
 
-            storageManager.updateScheduledDeliveryTime(reference);
+            if (ioExecutor != null)
+            {
+               ioExecutor.execute(new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        sendToDeadLetterAddress(reference);
+                     }
+                     catch (Exception e)
+                     {
+                        log.warn("Error on DLQ send", e);
+                     }
+                  }
+               });
+            }
+            else
+            {
+               storageManager.updateScheduledDeliveryTime(reference);
+            }
          }
 
          deliveringCount.decrementAndGet();
@@ -1377,7 +1448,7 @@
       return status;
    }
 
-   private void removeExpiringReference(final MessageReference ref) throws Exception
+   private void removeExpiringReference(final MessageReference ref)
    {
       if (ref.getMessage().getExpiration() > 0)
       {
@@ -1385,7 +1456,7 @@
       }
    }
 
-   private void postAcknowledge(final MessageReference ref) throws Exception
+   private void postAcknowledge(final MessageReference ref)
    {
       ServerMessage message = ref.getMessage();
 
@@ -1427,7 +1498,7 @@
       message.decrementRefCount(ref);
    }
 
-   void postRollback(final LinkedList<MessageReference> refs) throws Exception
+   void postRollback(final LinkedList<MessageReference> refs)
    {
       synchronized (this)
       {
@@ -1477,29 +1548,38 @@
       {
       }
 
-      public void afterPrepare(final Transaction tx) throws Exception
+      public void afterPrepare(final Transaction tx)
       {
       }
 
-      public void afterRollback(final Transaction tx) throws Exception
+      public void afterRollback(final Transaction tx)
       {
          Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<QueueImpl, LinkedList<MessageReference>>();
 
          for (MessageReference ref : refsToAck)
          {
-            if (ref.getQueue().checkDLQ(ref))
+            try
             {
-               LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
-
-               if (toCancel == null)
+               if (ref.getQueue().checkDLQ(ref, journalExecutor))
                {
-                  toCancel = new LinkedList<MessageReference>();
-
-                  queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+                  LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
+   
+                  if (toCancel == null)
+                  {
+                     toCancel = new LinkedList<MessageReference>();
+   
+                     queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+                  }
+   
+                  toCancel.addFirst(ref);
                }
-
-               toCancel.addFirst(ref);
             }
+            catch (Exception e)
+            {
+               // checkDLQ here will be using an executor, this shouldn't happen
+               // don't you just hate checked exceptions in java?
+               log.warn("Error on checkDLQ", e);
+            }
          }
 
          for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry : queueMap.entrySet())
@@ -1515,7 +1595,7 @@
          }
       }
 
-      public void afterCommit(final Transaction tx) throws Exception
+      public void afterCommit(final Transaction tx)
       {
          for (MessageReference ref : refsToAck)
          {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -119,7 +119,7 @@
       return count;
    }
 
-   public int decrementRefCount(final MessageReference reference) throws Exception
+   public int decrementRefCount(final MessageReference reference) 
    {
       int count = refCount.decrementAndGet();
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -766,6 +766,7 @@
 
    public void handleRollback(final RollbackMessage packet)
    {
+      new Exception("Rollback").printStackTrace();
       Packet response = null;
 
       try
@@ -1078,6 +1079,7 @@
 
    public void handleXARollback(final SessionXARollbackMessage packet)
    {
+      System.out.println("XARollback");
       Packet response = null;
 
       Xid xid = packet.getXid();

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -29,9 +29,9 @@
    
    void beforeRollback(Transaction tx) throws Exception;
    
-   void afterPrepare(Transaction tx) throws Exception;
+   void afterPrepare(Transaction tx);
       
-   void afterCommit(Transaction tx) throws Exception;
+   void afterCommit(Transaction tx);
    
-   void afterRollback(Transaction tx) throws Exception;   
+   void afterRollback(Transaction tx);   
 }

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -40,8 +40,20 @@
 {
    private static final Logger log = Logger.getLogger(QueueTest.class);
    
-   private QueueFactory queueFactory = new FakeQueueFactory();
+   private FakeQueueFactory queueFactory = new FakeQueueFactory();
    
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      queueFactory = new FakeQueueFactory();
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      queueFactory.stop();
+      super.tearDown();
+   }
+   
    /*
     * Concurrent set consumer not busy, busy then, call deliver while messages are being added and consumed
     */

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -538,15 +538,15 @@
       {
       }
 
-      public void afterPrepare(Transaction tx) throws Exception
+      public void afterPrepare(Transaction tx)
       {
       }
 
-      public void afterCommit(Transaction tx) throws Exception
+      public void afterCommit(Transaction tx)
       {
       }
 
-      public void afterRollback(Transaction tx) throws Exception
+      public void afterRollback(Transaction tx)
       {
          latch.countDown();
       }

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -15,6 +15,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -41,17 +43,23 @@
    private static final Logger log = Logger.getLogger(QueueImplTest.class);
 
    private ScheduledExecutorService scheduledExecutor;
+   
+   private ExecutorService executor;
 
    public void setUp() throws Exception
    {
    	super.setUp();
 
    	scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+   	
+   	executor = Executors.newSingleThreadExecutor();
    }
 
    public void tearDown() throws Exception
    {
    	scheduledExecutor.shutdownNow();
+   	
+   	executor.shutdown();
 
       super.tearDown();
    }
@@ -70,7 +78,7 @@
 
    public void testScheduledNoConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null, null, null);
 
       //Send one scheduled
 
@@ -136,7 +144,7 @@
 
    private void testScheduled(boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1,new SimpleString("address1"), new SimpleString("queue1"), null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1,new SimpleString("address1"), new SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = null;
 
@@ -243,7 +251,7 @@
             return HandleStatus.HANDLED;
          }
       };
-      Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1, null, false, true, executor, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -1131,7 +1131,7 @@
          return null;
       }
 
-      public int decrementRefCount(MessageReference reference) throws Exception
+      public int decrementRefCount(MessageReference reference)
       {
          // TODO Auto-generated method stub
          return 0;

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -493,4 +493,13 @@
       return false;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference, java.util.concurrent.Executor)
+    */
+   public boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
 }
\ No newline at end of file

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -17,6 +17,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -43,7 +44,23 @@
 {
    // The tests ----------------------------------------------------------------
 
-   private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+   private ScheduledExecutorService scheduledExecutor;
+   
+   private ExecutorService executor;
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      scheduledExecutor =  Executors.newSingleThreadScheduledExecutor();
+      executor = Executors.newSingleThreadExecutor();      
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      scheduledExecutor.shutdown();
+      executor.shutdown();
+      super.tearDown();
+   }
 
    private static final SimpleString queue1 = new SimpleString("queue1");
 
@@ -53,18 +70,18 @@
    {
       final SimpleString name = new SimpleString("oobblle");
 
-      Queue queue = new QueueImpl(1, address1, name, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, name, null, false, true, executor, scheduledExecutor, null, null, null);
 
       assertEquals(name, queue.getName());
    }
 
    public void testDurable()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, false, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, false, executor, scheduledExecutor, null, null, null);
 
       assertFalse(queue.isDurable());
 
-      queue = new QueueImpl(1, address1, queue1, null, true, false, scheduledExecutor, null, null, null);
+      queue = new QueueImpl(1, address1, queue1, null, true, false, executor, scheduledExecutor, null, null, null);
 
       assertTrue(queue.isDurable());
    }
@@ -77,7 +94,7 @@
 
       Consumer cons3 = new FakeConsumer();
 
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       assertEquals(0, queue.getConsumerCount());
 
@@ -118,7 +135,7 @@
 
    public void testGetFilter()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       assertNull(queue.getFilter());
 
@@ -135,7 +152,7 @@
          }
       };
 
-      queue = new QueueImpl(1, address1, queue1, filter, false, true, scheduledExecutor, null, null, null);
+      queue = new QueueImpl(1, address1, queue1, filter, false, true, executor, scheduledExecutor, null, null, null);
 
       assertEquals(filter, queue.getFilter());
 
@@ -143,7 +160,7 @@
 
    public void testSimpleadd()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -162,7 +179,7 @@
 
    public void testSimpleDirectDelivery() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -190,7 +207,7 @@
 
    public void testSimpleNonDirectDelivery() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -228,7 +245,7 @@
 
    public void testBusyConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -272,7 +289,7 @@
 
    public void testBusyConsumerThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -339,7 +356,7 @@
 
    public void testAddFirstadd() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -399,7 +416,7 @@
                                   null,
                                   false,
                                   true,
-                                  scheduledExecutor,
+                                  executor, scheduledExecutor,
                                   new FakePostOffice(),
                                   null,
                                   null);
@@ -556,7 +573,7 @@
 
    public void testConsumerReturningNull() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       class NullConsumer implements Consumer
       {
@@ -589,7 +606,7 @@
 
    public void testRoundRobinWithQueueing() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -632,7 +649,7 @@
 
    public void testRoundRobinDirect() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -673,7 +690,7 @@
 
    public void testWithPriorities() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -740,7 +757,7 @@
 
    public void testConsumerWithFilterAddAndRemove()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -749,7 +766,7 @@
 
    public void testList()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 20;
 
@@ -773,7 +790,7 @@
 
    public void testListWithFilter()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 20;
 
@@ -815,7 +832,7 @@
                                   null,
                                   false,
                                   true,
-                                  scheduledExecutor,
+                                  executor, scheduledExecutor,
                                   new FakePostOffice(),
                                   null,
                                   null);
@@ -887,7 +904,7 @@
 
    public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
 
@@ -928,7 +945,7 @@
 
    public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
 
@@ -1002,7 +1019,7 @@
 
    public void testConsumerWithFilterThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
       List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1072,7 +1089,7 @@
                                   null,
                                   false,
                                   true,
-                                  scheduledExecutor,
+                                  executor, scheduledExecutor,
                                   new FakePostOffice(),
                                   null,
                                   null);
@@ -1164,7 +1181,7 @@
    public void testMessageOrder() throws Exception
    {
       FakeConsumer consumer = new FakeConsumer();
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1184,7 +1201,7 @@
 
    public void testMessagesAdded() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1196,7 +1213,7 @@
 
    public void testGetReference() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1209,7 +1226,7 @@
 
    public void testGetNonExistentReference() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1226,7 +1243,7 @@
     */
    public void testPauseAndResumeWithAsync() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       // pauses the queue
       queue.pause();
@@ -1281,7 +1298,7 @@
 
    public void testPauseAndResumeWithDirect() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
 
       // Now add a consumer
       FakeConsumer consumer = new FakeConsumer();

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2009-11-19 17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2009-11-19 19:53:49 UTC (rev 8325)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.unit.core.server.impl.fakes;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -32,14 +33,16 @@
  */
 public class FakeQueueFactory implements QueueFactory
 {
-	private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-	
+   private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+   
+   private final ExecutorService executor = Executors.newSingleThreadExecutor();
+   
 	private PostOffice postOffice;
 
 	public Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, Filter filter,
 			                   boolean durable, boolean temporary)
 	{
-		return new QueueImpl(persistenceID, address, name, filter, durable, temporary, scheduledExecutor, postOffice, null, null);
+		return new QueueImpl(persistenceID, address, name, filter, durable, temporary, executor, scheduledExecutor, postOffice, null, null);
 	}
 	
    public void setPostOffice(PostOffice postOffice)
@@ -47,5 +50,12 @@
       this.postOffice = postOffice;
       
    }
+   
+   public void stop() throws Exception
+   {
+      scheduledExecutor.shutdown();
+      
+      executor.shutdown();
+   }
 
 }



More information about the hornetq-commits mailing list