[jboss-cvs] JBoss Messaging SVN: r5267 - trunk/src/main/org/jboss/messaging/core/transaction/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 4 14:29:07 EST 2008
Author: timfox
Date: 2008-11-04 14:29:07 -0500 (Tue, 04 Nov 2008)
New Revision: 5267
Modified:
trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Cosmetics on transaction timeouts
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2008-11-04 19:18:07 UTC (rev 5266)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2008-11-04 19:29:07 UTC (rev 5267)
@@ -23,29 +23,28 @@
package org.jboss.messaging.core.transaction.impl;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
-import java.util.HashMap;
import java.util.Map;
-import java.util.LinkedList;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Callable;
import javax.transaction.xa.Xid;
-import org.jboss.messaging.core.transaction.ResourceManager;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.core.transaction.Transaction;
/**
* A ResourceManagerImpl
@@ -66,7 +65,7 @@
private final ScheduledExecutorService executorService;
- private final Map<Xid, ScheduledFuture> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture>();
+ private final Map<Xid, ScheduledFuture<Boolean>> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture<Boolean>>();
private final StorageManager storageManager;
@@ -99,7 +98,9 @@
boolean added = transactions.putIfAbsent(xid, tx) == null;
if (added && timeoutSeconds > 0)
{
- ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx), timeoutSeconds, TimeUnit.SECONDS);
+ ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx),
+ timeoutSeconds,
+ TimeUnit.SECONDS);
scheduledTimeoutTxs.put(xid, future);
}
return added;
@@ -124,7 +125,7 @@
{
if (timeoutSeconds == 0)
{
- //reset to default
+ // reset to default
this.timeoutSeconds = defaultTimeoutSeconds;
}
else
@@ -148,19 +149,26 @@
return xids;
}
- class TxTimeoutHandler implements Callable
+ private class TxTimeoutHandler implements Callable
{
final Transaction tx;
- public TxTimeoutHandler(Transaction tx) {this.tx = tx;}
+ public TxTimeoutHandler(final Transaction tx)
+ {
+ this.tx = tx;
+ }
public Object call() throws Exception
{
transactions.remove(tx.getXid());
+
log.warn("transaction with xid " + tx.getXid() + " timed out");
+
List<MessageReference> rolledBack = tx.timeout();
+
Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+ // TODO - this code is duplicated in ServerSessionImpl - combine
for (MessageReference ref : rolledBack)
{
if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
@@ -186,6 +194,7 @@
entry.getKey().addListFirst(refs);
}
+
return null;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-04 19:18:07 UTC (rev 5266)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-04 19:29:07 UTC (rev 5267)
@@ -180,9 +180,12 @@
for (MessageReference ref : refs)
{
scheduledReferences.put(ref, scheduledDeliveryTime);
- if(ref.getQueue().isDurable())
+ if (ref.getQueue().isDurable())
{
- storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+ storageManager.storeMessageReferenceScheduledTransactional(id,
+ ref.getQueue().getPersistenceID(),
+ message.getMessageID(),
+ scheduledDeliveryTime);
}
}
}
@@ -190,14 +193,15 @@
public List<MessageReference> timeout() throws Exception
{
- //we need to synchronize with commit and rollback just in case they get called atthesame time
+ // we need to synchronize with commit and rollback just in case they get called atthesame time
synchronized (timeoutLock)
{
- //if we've already rolled back or committed we don't need to do anything
- if(state == State.COMMITTED || state == State.ROLLBACK_ONLY)
+ // if we've already rolled back or committed we don't need to do anything
+ if (state == State.COMMITTED || state == State.ROLLBACK_ONLY)
{
return Collections.emptyList();
}
+
return doRollback();
}
}
@@ -217,7 +221,7 @@
{
pagingManager.messageDone(message);
}
-
+
if (message.isDurable())
{
Queue queue = acknowledgement.getQueue();
@@ -270,12 +274,7 @@
}
public void commit() throws Exception
- {
-// if (inMethod != -1)
-// {
-// throw new IllegalStateException("Can't commit, already inmethod " + inMethod);
-// }
- inMethod = 2;
+ {
synchronized (timeoutLock)
{
if (state == State.ROLLBACK_ONLY)
@@ -318,7 +317,7 @@
for (MessageReference ref : refsToAdd)
{
Long scheduled = scheduledReferences.get(ref);
- if(scheduled == null)
+ if (scheduled == null)
{
ref.getQueue().addLast(ref);
}
@@ -346,16 +345,10 @@
state = State.COMMITTED;
}
- inMethod = -1;
}
public List<MessageReference> rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
-// if (inMethod != -1)
-// {
-// throw new IllegalStateException("Can't rollback, already inmethod " + inMethod);
-// }
- inMethod=1;
LinkedList<MessageReference> toCancel;
synchronized (timeoutLock)
{
@@ -379,7 +372,6 @@
state = State.ROLLEDBACK;
}
- inMethod = -1;
return toCancel;
}
@@ -399,23 +391,11 @@
for (MessageReference ref : acknowledgements)
{
-// Queue queue = ref.getQueue();
-//
-// ServerMessage message = ref.getMessage();
-
- // Putting back the size on pagingManager, and reverting the counters
-
- //FIXME - why????
- //Surely paging happens before routing, so cancellation shouldn't effect anything......
-// if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
-// {
-// pagingManager.addSize(message);
-// }
-
toCancel.add(ref);
}
clear();
+
return toCancel;
}
@@ -500,15 +480,8 @@
// Private
// -------------------------------------------------------------------
- private volatile int inMethod;
-
private List<MessageReference> route(final ServerMessage message) throws Exception
{
-// if (inMethod != -1)
-// {
-// throw new IllegalStateException("Can't route, already inmethod " + inMethod);
-// }
- inMethod = 0;
List<MessageReference> refs = postOffice.route(message);
refsToAdd.addAll(refs);
@@ -519,7 +492,7 @@
containsPersistent = true;
}
- inMethod = -1;
+
return refs;
}
@@ -542,7 +515,6 @@
for (ServerMessage message : pagedMessages)
{
-
// http://wiki.jboss.org/wiki/JBossMessaging2Paging
// Explained under Transaction On Paging. (This is the item B)
if (pagingManager.page(message, id))
@@ -585,9 +557,12 @@
for (MessageReference ref : refs)
{
scheduledReferences.put(ref, scheduledDeliveryTime);
- if(ref.getQueue().isDurable())
+ if (ref.getQueue().isDurable())
{
- storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+ storageManager.storeMessageReferenceScheduledTransactional(id,
+ ref.getQueue().getPersistenceID(),
+ message.getMessageID(),
+ scheduledDeliveryTime);
}
}
}
More information about the jboss-cvs-commits
mailing list