[jboss-cvs] JBoss Messaging SVN: r5267 - trunk/src/main/org/jboss/messaging/core/transaction/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 4 14:29:07 EST 2008


Author: timfox
Date: 2008-11-04 14:29:07 -0500 (Tue, 04 Nov 2008)
New Revision: 5267

Modified:
   trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Cosmetics on transaction timeouts


Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2008-11-04 19:18:07 UTC (rev 5266)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2008-11-04 19:29:07 UTC (rev 5267)
@@ -23,29 +23,28 @@
 package org.jboss.messaging.core.transaction.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.LinkedList;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Callable;
 
 import javax.transaction.xa.Xid;
 
-import org.jboss.messaging.core.transaction.ResourceManager;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.core.transaction.Transaction;
 
 /**
  * A ResourceManagerImpl
@@ -66,7 +65,7 @@
 
    private final ScheduledExecutorService executorService;
 
-   private final Map<Xid, ScheduledFuture> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture>();
+   private final Map<Xid, ScheduledFuture<Boolean>> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture<Boolean>>();
 
    private final StorageManager storageManager;
 
@@ -99,7 +98,9 @@
       boolean added = transactions.putIfAbsent(xid, tx) == null;
       if (added && timeoutSeconds > 0)
       {
-         ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx), timeoutSeconds, TimeUnit.SECONDS);
+         ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx),
+                                                                    timeoutSeconds,
+                                                                    TimeUnit.SECONDS);
          scheduledTimeoutTxs.put(xid, future);
       }
       return added;
@@ -124,7 +125,7 @@
    {
       if (timeoutSeconds == 0)
       {
-         //reset to default
+         // reset to default
          this.timeoutSeconds = defaultTimeoutSeconds;
       }
       else
@@ -148,19 +149,26 @@
       return xids;
    }
 
-   class TxTimeoutHandler implements Callable
+   private class TxTimeoutHandler implements Callable
    {
       final Transaction tx;
 
-      public TxTimeoutHandler(Transaction tx) {this.tx = tx;}
+      public TxTimeoutHandler(final Transaction tx)
+      {
+         this.tx = tx;
+      }
 
       public Object call() throws Exception
       {
          transactions.remove(tx.getXid());
+
          log.warn("transaction with xid " + tx.getXid() + " timed out");
+
          List<MessageReference> rolledBack = tx.timeout();
+
          Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
 
+         // TODO - this code is duplicated in ServerSessionImpl - combine
          for (MessageReference ref : rolledBack)
          {
             if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
@@ -186,6 +194,7 @@
 
             entry.getKey().addListFirst(refs);
          }
+
          return null;
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-11-04 19:18:07 UTC (rev 5266)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-11-04 19:29:07 UTC (rev 5267)
@@ -180,9 +180,12 @@
          for (MessageReference ref : refs)
          {
             scheduledReferences.put(ref, scheduledDeliveryTime);
-            if(ref.getQueue().isDurable())
+            if (ref.getQueue().isDurable())
             {
-               storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+               storageManager.storeMessageReferenceScheduledTransactional(id,
+                                                                          ref.getQueue().getPersistenceID(),
+                                                                          message.getMessageID(),
+                                                                          scheduledDeliveryTime);
             }
          }
       }
@@ -190,14 +193,15 @@
 
    public List<MessageReference> timeout() throws Exception
    {
-      //we need to synchronize with commit and rollback just in case they get called atthesame time
+      // we need to synchronize with commit and rollback just in case they get called atthesame time
       synchronized (timeoutLock)
       {
-         //if we've already rolled back or committed we don't need to do anything
-         if(state == State.COMMITTED || state == State.ROLLBACK_ONLY)
+         // if we've already rolled back or committed we don't need to do anything
+         if (state == State.COMMITTED || state == State.ROLLBACK_ONLY)
          {
             return Collections.emptyList();
          }
+         
          return doRollback();
       }
    }
@@ -217,7 +221,7 @@
       {
          pagingManager.messageDone(message);
       }
-      
+
       if (message.isDurable())
       {
          Queue queue = acknowledgement.getQueue();
@@ -270,12 +274,7 @@
    }
 
    public void commit() throws Exception
-   {      
-//      if (inMethod != -1)
-//      {
-//         throw new IllegalStateException("Can't commit, already inmethod " + inMethod);
-//      }
-      inMethod = 2;
+   {
       synchronized (timeoutLock)
       {
          if (state == State.ROLLBACK_ONLY)
@@ -318,7 +317,7 @@
          for (MessageReference ref : refsToAdd)
          {
             Long scheduled = scheduledReferences.get(ref);
-            if(scheduled == null)
+            if (scheduled == null)
             {
                ref.getQueue().addLast(ref);
             }
@@ -346,16 +345,10 @@
 
          state = State.COMMITTED;
       }
-      inMethod = -1;
    }
 
    public List<MessageReference> rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
-//      if (inMethod != -1)
-//      {
-//         throw new IllegalStateException("Can't rollback, already inmethod " + inMethod);
-//      }
-      inMethod=1;
       LinkedList<MessageReference> toCancel;
       synchronized (timeoutLock)
       {
@@ -379,7 +372,6 @@
          state = State.ROLLEDBACK;
       }
 
-      inMethod = -1;
       return toCancel;
    }
 
@@ -399,23 +391,11 @@
 
       for (MessageReference ref : acknowledgements)
       {
-//         Queue queue = ref.getQueue();
-//
-//         ServerMessage message = ref.getMessage();
-
-         // Putting back the size on pagingManager, and reverting the counters
-
-         //FIXME - why????
-         //Surely paging happens before routing, so cancellation shouldn't effect anything......
-//         if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
-//         {
-//            pagingManager.addSize(message);
-//         }
-
          toCancel.add(ref);
       }
 
       clear();
+
       return toCancel;
    }
 
@@ -500,15 +480,8 @@
    // Private
    // -------------------------------------------------------------------
 
-   private volatile int inMethod;
-   
    private List<MessageReference> route(final ServerMessage message) throws Exception
    {
-//      if (inMethod != -1)
-//      {
-//         throw new IllegalStateException("Can't route, already inmethod " + inMethod);
-//      }
-      inMethod = 0;
       List<MessageReference> refs = postOffice.route(message);
 
       refsToAdd.addAll(refs);
@@ -519,7 +492,7 @@
 
          containsPersistent = true;
       }
-      inMethod = -1;
+
       return refs;
    }
 
@@ -542,7 +515,6 @@
 
       for (ServerMessage message : pagedMessages)
       {
-
          // http://wiki.jboss.org/wiki/JBossMessaging2Paging
          // Explained under Transaction On Paging. (This is the item B)
          if (pagingManager.page(message, id))
@@ -585,9 +557,12 @@
             for (MessageReference ref : refs)
             {
                scheduledReferences.put(ref, scheduledDeliveryTime);
-               if(ref.getQueue().isDurable())
+               if (ref.getQueue().isDurable())
                {
-                  storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+                  storageManager.storeMessageReferenceScheduledTransactional(id,
+                                                                             ref.getQueue().getPersistenceID(),
+                                                                             message.getMessageID(),
+                                                                             scheduledDeliveryTime);
                }
             }
          }




More information about the jboss-cvs-commits mailing list