[jboss-cvs] JBoss Messaging SVN: r4261 - in trunk/src/main/org/jboss/messaging/core: transaction and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 21 06:03:17 EDT 2008
Author: ataylor
Date: 2008-05-21 06:03:17 -0400 (Wed, 21 May 2008)
New Revision: 4261
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
aded a flag on transaction so that it can be marked as failed if a send fails. This is so a client sending messages one way will always get a fail on commit.
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-21 09:39:33 UTC (rev 4260)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-21 10:03:17 UTC (rev 4261)
@@ -21,21 +21,6 @@
*/
package org.jboss.messaging.core.server.impl;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -46,24 +31,11 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.Delivery;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConnection;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -72,19 +44,27 @@
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Session implementation
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> Parts derived from
* JBM 1.x ServerSessionImpl by
- *
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision: 3783 $</tt>
- *
- * $Id: ServerSessionImpl.java 3783 2008-02-25 12:15:14Z timfox $
+ * <p/>
+ * $Id: ServerSessionImpl.java 3783 2008-02-25 12:15:14Z timfox $
*/
public class ServerSessionImpl implements ServerSession
{
@@ -136,7 +116,7 @@
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Transaction tx;
-
+
private final Object rollbackCancelLock = new Object();
// Constructors
@@ -150,7 +130,7 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice, final SecurityStore securityStore) throws Exception
{
- this.id = id;
+ this.id = id;
this.autoCommitSends = autoCommitSends;
@@ -188,7 +168,7 @@
public long getID()
{
- return id;
+ return id;
}
public ServerConnection getConnection()
@@ -229,16 +209,16 @@
public void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
Delivery delivery;
-
+
synchronized (rollbackCancelLock)
{
long nextID = deliveryIDSequence.getAndIncrement();
-
+
delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), nextID, sender);
-
+
deliveries.add(delivery);
}
-
+
delivery.deliver();
}
@@ -246,7 +226,7 @@
{
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers);
- for (ServerConsumer consumer: consumersClone)
+ for (ServerConsumer consumer : consumersClone)
{
consumer.setStarted(s);
}
@@ -256,7 +236,7 @@
{
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers);
- for (ServerConsumer consumer: consumersClone)
+ for (ServerConsumer consumer : consumersClone)
{
consumer.close();
}
@@ -265,7 +245,7 @@
Set<ServerBrowserImpl> browsersClone = new HashSet<ServerBrowserImpl>(browsers);
- for (ServerBrowserImpl browser: browsersClone)
+ for (ServerBrowserImpl browser : browsersClone)
{
browser.close();
}
@@ -274,7 +254,7 @@
Set<ServerProducer> producersClone = new HashSet<ServerProducer>(producers);
- for (ServerProducer producer: producersClone)
+ for (ServerProducer producer : producersClone)
{
producer.close();
}
@@ -297,8 +277,31 @@
public void send(final ServerMessage msg) throws Exception
{
- //check the user has write access to this address
- securityStore.check(msg.getDestination(), CheckType.WRITE, connection);
+ //check the user has write access to this address.
+ try
+ {
+ securityStore.check(msg.getDestination(), CheckType.WRITE, connection);
+ }
+ catch (Exception e)
+ {
+ if (!autoCommitSends)
+ {
+ MessagingException messagingException;
+ if (e instanceof MessagingException)
+ {
+ messagingException = (MessagingException) e;
+ }
+ else
+ {
+ messagingException = new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage());
+ }
+ tx.markAsFailed(messagingException);
+ }
+ else
+ {
+ throw e;
+ }
+ }
msg.setMessageID(persistenceManager.generateMessageID());
@@ -309,33 +312,33 @@
if (autoCommitSends)
{
- List<MessageReference> refs = postOffice.route(msg);
+ List<MessageReference> refs = postOffice.route(msg);
- if (msg.getDurableRefCount() != 0)
- {
- persistenceManager.storeMessage(msg);
- }
+ if (msg.getDurableRefCount() != 0)
+ {
+ persistenceManager.storeMessage(msg);
+ }
- for (MessageReference ref: refs)
- {
- ref.getQueue().addLast(ref);
- }
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
}
else
{
- tx.addMessage(msg);
+ tx.addMessage(msg);
}
}
public void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
{
- /*
- Note that we do not consider it an error if the deliveries cannot be found to be acked.
- This can legitimately occur if a connection/session/consumer is closed
- from inside a MessageHandlers onMessage method. In this situation the close will cancel any unacked
- deliveries, but the subsequent call to delivered() will try and ack again and not find the last
- delivery on the server.
- */
+ /*
+ Note that we do not consider it an error if the deliveries cannot be found to be acked.
+ This can legitimately occur if a connection/session/consumer is closed
+ from inside a MessageHandlers onMessage method. In this situation the close will cancel any unacked
+ deliveries, but the subsequent call to delivered() will try and ack again and not find the last
+ delivery on the server.
+ */
if (allUpTo)
{
// Ack all deliveries up to and including the specified id
@@ -360,11 +363,11 @@
if (autoCommitAcks)
{
- doAck(ref);
+ doAck(ref);
}
else
{
- tx.addAcknowledgement(ref);
+ tx.addAcknowledgement(ref);
//Del count is not actually updated in storage unless it's cancelled
ref.incrementDeliveryCount();
@@ -398,7 +401,7 @@
if (autoCommitAcks)
{
- doAck(ref);
+ doAck(ref);
}
else
{
@@ -433,14 +436,14 @@
{
tx.addAcknowledgement(del.getReference());
}
-
+
deliveries.clear();
-
+
deliveryIDSequence.addAndGet(-tx.getAcknowledgementsCount());
}
-
+
tx.rollback(queueSettingsRepository);
-
+
tx = new TransactionImpl(persistenceManager, postOffice);
}
@@ -497,9 +500,16 @@
public void commit() throws Exception
{
- tx.commit();
+ try
+ {
+ tx.commit();
+ }
+ finally
+ {
+ tx = new TransactionImpl(persistenceManager, postOffice);
+ }
- tx = new TransactionImpl(persistenceManager, postOffice);
+
}
public SessionXAResponseMessage XACommit(final boolean onePhase, final Xid xid) throws Exception
@@ -507,7 +517,7 @@
if (tx != null)
{
final String msg = "Cannot commit, session is currently doing work in a transaction "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -521,9 +531,12 @@
return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
}
- if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot commit transaction, it is suspended " + xid); }
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ return new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot commit transaction, it is suspended " + xid);
+ }
theTx.commit();
@@ -563,7 +576,7 @@
if (theTx == null)
{
final String msg = "Cannot find suspended transaction to end "
- + xid;
+ + xid;
return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
}
@@ -600,8 +613,11 @@
return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
}
- if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid); }
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ return new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid);
+ }
tx = theTx;
@@ -613,7 +629,7 @@
if (tx != null)
{
final String msg = "Cannot commit, session is currently doing work in a transaction "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -627,9 +643,12 @@
return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
}
- if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot prepare transaction, it is suspended " + xid); }
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ return new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot prepare transaction, it is suspended " + xid);
+ }
if (theTx.isEmpty())
{
@@ -659,7 +678,7 @@
if (tx != null)
{
final String msg = "Cannot resume, session is currently doing work in a transaction "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -673,9 +692,12 @@
return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
}
- if (theTx.getState() != Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot resume transaction, it is not suspended " + xid); }
+ if (theTx.getState() != Transaction.State.SUSPENDED)
+ {
+ return new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot resume transaction, it is not suspended " + xid);
+ }
tx = theTx;
@@ -689,7 +711,7 @@
if (tx != null)
{
final String msg = "Cannot roll back, session is currently doing work in a transaction "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -703,9 +725,12 @@
return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
}
- if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot rollback transaction, it is suspended " + xid); }
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ return new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot rollback transaction, it is suspended " + xid);
+ }
theTx.rollback(queueSettingsRepository);
@@ -726,7 +751,7 @@
if (tx != null)
{
final String msg = "Cannot start, session is already doing work in a transaction "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -738,7 +763,7 @@
if (!added)
{
final String msg = "Cannot start, there is already a xid "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
}
@@ -751,7 +776,7 @@
if (tx == null)
{
final String msg = "Cannot suspend, session is not doing work in a transaction "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -759,7 +784,7 @@
if (tx.getState() == Transaction.State.SUSPENDED)
{
final String msg = "Cannot suspend, transaction is already suspended "
- + tx.getXid();
+ + tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
@@ -792,7 +817,7 @@
if (!postOffice.addDestination(address, temporary))
{
- throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
+ throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
}
else
{
@@ -805,7 +830,7 @@
public void removeDestination(final SimpleString address, final boolean temporary) throws Exception
{
- securityStore.check(address, CheckType.CREATE, connection);
+ securityStore.check(address, CheckType.CREATE, connection);
if (!postOffice.removeDestination(address, temporary))
{
@@ -815,20 +840,20 @@
{
if (temporary)
{
- connection.removeTemporaryDestination(address);
+ connection.removeTemporaryDestination(address);
}
}
}
public void createQueue(final SimpleString address, final SimpleString queueName,
- final SimpleString filterString, boolean durable, final boolean temporary) throws Exception
+ final SimpleString filterString, boolean durable, final boolean temporary) throws Exception
{
//make sure the user has privileges to create this address
if (!postOffice.containsDestination(address))
{
try
{
- securityStore.check(address, CheckType.CREATE, connection);
+ securityStore.check(address, CheckType.CREATE, connection);
}
catch (MessagingException e)
{
@@ -855,7 +880,7 @@
}
binding = postOffice.addBinding(address, queueName, filter, durable,
- temporary);
+ temporary);
if (temporary)
{
@@ -878,12 +903,12 @@
if (queue.getConsumerCount() != 0)
{
- throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
}
if (queue.isDurable())
{
- binding.getQueue().deleteAllReferences(persistenceManager);
+ binding.getQueue().deleteAllReferences(persistenceManager);
}
if (queue.isTemporary())
@@ -923,15 +948,15 @@
maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
long id = dispatcher.generateID();
-
+
ServerConsumer consumer =
- new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
- this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
+ new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+ this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
dispatcher.register(new ServerConsumerPacketHandler(consumer));
SessionCreateConsumerResponseMessage response =
- new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
+ new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
consumers.add(consumer);
@@ -958,8 +983,8 @@
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new SessionQueueQueryResponseMessage(queue.isDurable(), queue.isTemporary(), queue.getMaxSize(),
- queue.getConsumerCount(), queue.getMessageCount(),
- filterString, binding.getAddress());
+ queue.getConsumerCount(), queue.getMessageCount(),
+ filterString, binding.getAddress());
}
else
{
@@ -984,7 +1009,7 @@
{
List<Binding> bindings = postOffice.getBindingsForAddress(request.getAddress());
- for (Binding binding: bindings)
+ for (Binding binding : bindings)
{
queueNames.add(binding.getQueue().getName());
}
@@ -994,7 +1019,7 @@
}
public SessionCreateBrowserResponseMessage createBrowser(final SimpleString queueName, final SimpleString filterString)
- throws Exception
+ throws Exception
{
Binding binding = postOffice.getBinding(queueName);
@@ -1006,7 +1031,7 @@
securityStore.check(binding.getAddress(), CheckType.READ, connection);
long id = dispatcher.generateID();
-
+
ServerBrowserImpl browser = new ServerBrowserImpl(id, this, binding.getQueue(), filterString == null ? null : filterString.toString());
browsers.add(browser);
@@ -1015,41 +1040,42 @@
return new SessionCreateBrowserResponseMessage(browser.getID());
}
-
+
/**
* Create a producer for the specified address
- * @param address The address to produce too
+ *
+ * @param address The address to produce too
* @param windowSize - the producer window size to use for flow control.
- * Specify -1 to disable flow control completely
- * The actual window size used may be less than the specified window size if the queue's maxSize attribute
- * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
- * specified on the queue
+ * Specify -1 to disable flow control completely
+ * The actual window size used may be less than the specified window size if the queue's maxSize attribute
+ * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
+ * specified on the queue
*/
public SessionCreateProducerResponseMessage createProducer(final long clientTargetID, final SimpleString address, final int windowSize,
- final int maxRate) throws Exception
+ final int maxRate) throws Exception
{
- FlowController flowController = null;
+ FlowController flowController = null;
- final int maxRateToUse = maxRate;
+ final int maxRateToUse = maxRate;
- // TODO Flow control disabled for now
-
+ // TODO Flow control disabled for now
+
// if (address != null)
// {
// flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
// }
-
- long id = dispatcher.generateID();
- ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
+ long id = dispatcher.generateID();
- producers.add(producer);
+ ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
- dispatcher.register(new ServerProducerPacketHandler(producer));
+ producers.add(producer);
- final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
+ dispatcher.register(new ServerProducerPacketHandler(producer));
- return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
+ final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
+
+ return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
}
// Public ---------------------------------------------------------------------------------------------
@@ -1063,25 +1089,25 @@
private void doAck(final MessageReference ref) throws Exception
{
- ServerMessage message = ref.getMessage();
+ ServerMessage message = ref.getMessage();
- Queue queue = ref.getQueue();
+ Queue queue = ref.getQueue();
- if (message.isDurable() && queue.isDurable())
- {
- int count = message.decrementDurableRefCount();
+ if (message.isDurable() && queue.isDurable())
+ {
+ int count = message.decrementDurableRefCount();
- if (count == 0)
- {
- persistenceManager.storeDelete(message.getMessageID());
- }
- else
- {
- persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
- }
- }
+ if (count == 0)
+ {
+ persistenceManager.storeDelete(message.getMessageID());
+ }
+ else
+ {
+ persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
+ }
+ }
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged();
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-05-21 09:39:33 UTC (rev 4260)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-05-21 10:03:17 UTC (rev 4261)
@@ -21,50 +21,51 @@
*/
package org.jboss.messaging.core.transaction;
-import javax.transaction.xa.Xid;
-
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
+import javax.transaction.xa.Xid;
+
/**
- *
* A JBoss Messaging internal transaction
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public interface Transaction
-{
- void prepare() throws Exception;
-
+{
+ void prepare() throws Exception;
+
void commit() throws Exception;
-
+
void rollback(HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
-
+
void addMessage(ServerMessage message) throws Exception;
void addAcknowledgement(MessageReference acknowledgement) throws Exception;
-
+
int getAcknowledgementsCount();
-
+
long getID();
-
+
Xid getXid();
-
+
boolean isEmpty();
-
+
void suspend();
-
+
void resume();
-
+
State getState();
-
+
boolean isContainsPersistent();
-
+
+ void markAsFailed(MessagingException messagingException);
+
static enum State
{
- ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED;
+ ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-05-21 09:39:33 UTC (rev 4260)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-05-21 10:03:17 UTC (rev 4261)
@@ -21,14 +21,7 @@
*/
package org.jboss.messaging.core.transaction.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.xa.Xid;
-
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -39,310 +32,325 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
+import javax.transaction.xa.Xid;
+import java.util.*;
+
/**
- *
* A TransactionImpl
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public class TransactionImpl implements Transaction
{
- private static final Logger log = Logger.getLogger(TransactionImpl.class);
+ private static final Logger log = Logger.getLogger(TransactionImpl.class);
- private final StorageManager storageManager;
+ private final StorageManager storageManager;
- private final PostOffice postOffice;
+ private final PostOffice postOffice;
- private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+ private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
- private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
+ private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
- private final Xid xid;
+ private final Xid xid;
- private final long id;
+ private final long id;
- private volatile State state = State.ACTIVE;
+ private volatile State state = State.ACTIVE;
- private volatile boolean containsPersistent;
+ private volatile boolean containsPersistent;
- public TransactionImpl(final StorageManager storageManager,
- final PostOffice postOffice)
- {
- this.storageManager = storageManager;
+ private MessagingException messagingException;
- this.postOffice = postOffice;
+ private boolean failed;
- this.xid = null;
+ public TransactionImpl(final StorageManager storageManager,
+ final PostOffice postOffice)
+ {
+ this.storageManager = storageManager;
- this.id = storageManager.generateTransactionID();
- }
+ this.postOffice = postOffice;
- public TransactionImpl(final Xid xid, final StorageManager storageManager,
- final PostOffice postOffice)
- {
- this.storageManager = storageManager;
+ this.xid = null;
- this.postOffice = postOffice;
+ this.id = storageManager.generateTransactionID();
+ }
- this.xid = xid;
+ public TransactionImpl(final Xid xid, final StorageManager storageManager,
+ final PostOffice postOffice)
+ {
+ this.storageManager = storageManager;
- this.id = storageManager.generateTransactionID();
- }
+ this.postOffice = postOffice;
- // Transaction implementation
- // -----------------------------------------------------------
+ this.xid = xid;
- public long getID()
- {
- return id;
- }
+ this.id = storageManager.generateTransactionID();
+ }
- public void addMessage(final ServerMessage message) throws Exception
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
+ // Transaction implementation
+ // -----------------------------------------------------------
- List<MessageReference> refs = postOffice.route(message);
+ public long getID()
+ {
+ return id;
+ }
- refsToAdd.addAll(refs);
+ public void addMessage(final ServerMessage message) throws Exception
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
- if (message.getDurableRefCount() != 0)
- {
- storageManager.storeMessageTransactional(id, message);
+ List<MessageReference> refs = postOffice.route(message);
- containsPersistent = true;
- }
- }
+ refsToAdd.addAll(refs);
+ if (message.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessageTransactional(id, message);
+
+ containsPersistent = true;
+ }
+ }
+
public void addAcknowledgement(final MessageReference acknowledgement)
- throws Exception
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
- acknowledgements.add(acknowledgement);
+ throws Exception
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+ acknowledgements.add(acknowledgement);
- ServerMessage message = acknowledgement.getMessage();
+ ServerMessage message = acknowledgement.getMessage();
- if (message.isDurable())
- {
- Queue queue = acknowledgement.getQueue();
+ if (message.isDurable())
+ {
+ Queue queue = acknowledgement.getQueue();
- if (queue.isDurable())
- {
- // Need to lock on the message to prevent a race where the ack and
- // delete
- // records get recorded in the log in the wrong order
+ if (queue.isDurable())
+ {
+ // Need to lock on the message to prevent a race where the ack and
+ // delete
+ // records get recorded in the log in the wrong order
- // TODO For now - we just use synchronized - can probably do better
- // locking
+ // TODO For now - we just use synchronized - can probably do better
+ // locking
- synchronized (message)
- {
- message.decrementDurableRefCount();
+ synchronized (message)
+ {
+ message.decrementDurableRefCount();
- if (message.getDurableRefCount() == 0)
- {
- storageManager.storeDeleteTransactional(id, message
- .getMessageID());
- }
- else
- {
- storageManager.storeAcknowledgeTransactional(id, queue
- .getPersistenceID(), message.getMessageID());
- }
+ if (message.getDurableRefCount() == 0)
+ {
+ storageManager.storeDeleteTransactional(id, message
+ .getMessageID());
+ }
+ else
+ {
+ storageManager.storeAcknowledgeTransactional(id, queue
+ .getPersistenceID(), message.getMessageID());
+ }
- containsPersistent = true;
- }
- }
- }
+ containsPersistent = true;
+ }
+ }
+ }
- }
+ }
- public void prepare() throws Exception
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
+ public void prepare() throws Exception
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
- if (xid == null)
- {
- throw new IllegalStateException("Cannot prepare non XA transaction");
- }
+ if (xid == null)
+ {
+ throw new IllegalStateException("Cannot prepare non XA transaction");
+ }
- if (containsPersistent)
- {
- storageManager.prepare(id);
- }
+ if (containsPersistent)
+ {
+ storageManager.prepare(id);
+ }
- state = State.PREPARED;
- }
+ state = State.PREPARED;
+ }
- public void commit() throws Exception
- {
- if (xid != null)
- {
- if (state != State.PREPARED)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
- }
- else
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
- }
+ public void commit() throws Exception
+ {
+ if (failed)
+ {
+ throw messagingException;
+ }
+ if (xid != null)
+ {
+ if (state != State.PREPARED)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+ }
+ else
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+ }
- if (containsPersistent)
- {
- storageManager.commit(id);
- }
+ if (containsPersistent)
+ {
+ storageManager.commit(id);
+ }
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
+ for (MessageReference ref : refsToAdd)
+ {
+ ref.getQueue().addLast(ref);
+ }
- for (MessageReference reference : acknowledgements)
- {
- reference.getQueue().referenceAcknowledged();
- }
+ for (MessageReference reference : acknowledgements)
+ {
+ reference.getQueue().referenceAcknowledged();
+ }
- clear();
+ clear();
- state = State.COMMITTED;
- }
+ state = State.COMMITTED;
+ }
- public void rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
- {
- if (xid != null)
- {
- if (state != State.PREPARED && state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
- }
- else
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
- }
+ public void rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+ {
+ if (xid != null)
+ {
+ if (state != State.PREPARED && state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+ }
+ else
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+ }
- if (containsPersistent)
- {
- storageManager.rollback(id);
- }
+ if (containsPersistent)
+ {
+ storageManager.rollback(id);
+ }
- Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+ Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
- // We sort into lists - one for each queue involved.
- // Then we cancel back atomicly for each queue adding list on front to
- // guarantee ordering is preserved
+ // We sort into lists - one for each queue involved.
+ // Then we cancel back atomicly for each queue adding list on front to
+ // guarantee ordering is preserved
- for (MessageReference ref : acknowledgements)
- {
- Queue queue = ref.getQueue();
+ for (MessageReference ref : acknowledgements)
+ {
+ Queue queue = ref.getQueue();
- ServerMessage message = ref.getMessage();
+ ServerMessage message = ref.getMessage();
- if (message.isDurable() && queue.isDurable())
- {
- // Reverse the decrements we did in the tx
- message.incrementDurableRefCount();
- }
+ if (message.isDurable() && queue.isDurable())
+ {
+ // Reverse the decrements we did in the tx
+ message.incrementDurableRefCount();
+ }
- LinkedList<MessageReference> list = queueMap.get(queue);
+ LinkedList<MessageReference> list = queueMap.get(queue);
- if (list == null)
- {
- list = new LinkedList<MessageReference>();
+ if (list == null)
+ {
+ list = new LinkedList<MessageReference>();
- queueMap.put(queue, list);
- }
+ queueMap.put(queue, list);
+ }
- if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
- {
- list.add(ref);
- }
- }
+ if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ {
+ list.add(ref);
+ }
+ }
- for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap
- .entrySet())
- {
- LinkedList<MessageReference> refs = entry.getValue();
+ for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap
+ .entrySet())
+ {
+ LinkedList<MessageReference> refs = entry.getValue();
- entry.getKey().addListFirst(refs);
- }
+ entry.getKey().addListFirst(refs);
+ }
- clear();
+ clear();
- state = State.ROLLEDBACK;
- }
+ state = State.ROLLEDBACK;
+ }
- public int getAcknowledgementsCount()
- {
- return acknowledgements.size();
- }
+ public int getAcknowledgementsCount()
+ {
+ return acknowledgements.size();
+ }
- public void suspend()
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Can only suspend active transaction");
- }
- state = State.SUSPENDED;
- }
+ public void suspend()
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Can only suspend active transaction");
+ }
+ state = State.SUSPENDED;
+ }
- public void resume()
- {
- if (state != State.SUSPENDED)
- {
- throw new IllegalStateException("Can only resume a suspended transaction");
- }
- state = State.ACTIVE;
- }
+ public void resume()
+ {
+ if (state != State.SUSPENDED)
+ {
+ throw new IllegalStateException("Can only resume a suspended transaction");
+ }
+ state = State.ACTIVE;
+ }
- public Transaction.State getState()
- {
- return state;
- }
+ public Transaction.State getState()
+ {
+ return state;
+ }
- public Xid getXid()
- {
- return xid;
- }
+ public Xid getXid()
+ {
+ return xid;
+ }
- public boolean isEmpty()
- {
- return refsToAdd.isEmpty() && acknowledgements.isEmpty();
- }
+ public boolean isEmpty()
+ {
+ return refsToAdd.isEmpty() && acknowledgements.isEmpty();
+ }
- public boolean isContainsPersistent()
- {
- return containsPersistent;
- }
+ public boolean isContainsPersistent()
+ {
+ return containsPersistent;
+ }
- public void setContainsPersistent(final boolean containsPersistent)
- {
- this.containsPersistent = containsPersistent;
- }
+ public void markAsFailed(MessagingException messagingException)
+ {
+ this.failed = true;
+ this.messagingException = messagingException;
+ }
- // Private
- // -------------------------------------------------------------------
+ public void setContainsPersistent(final boolean containsPersistent)
+ {
+ this.containsPersistent = containsPersistent;
+ }
- private void clear()
- {
- refsToAdd.clear();
+ // Private
+ // -------------------------------------------------------------------
- acknowledgements.clear();
- }
+ private void clear()
+ {
+ refsToAdd.clear();
+
+ acknowledgements.clear();
+ }
}
More information about the jboss-cvs-commits
mailing list