[jboss-cvs] JBoss Messaging SVN: r8301 - in branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss: jms/client/delegate and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon May 16 04:44:05 EDT 2011
Author: bershath27
Date: 2011-05-16 04:44:04 -0400 (Mon, 16 May 2011)
New Revision: 8301
Modified:
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java
branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
Log:
JBPAPP-6386
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -150,8 +150,6 @@
public Object handleStop(Invocation invocation) throws Throwable
{
ConnectionState currentState = getConnectionState(invocation);
- // If the session is being sent to the Cache, we need to clear the Thread because of the ContextClassLoader
- currentState.clearExecutors();
currentState.setStarted(false);
currentState.setJustCreated(false);
return invocation.invokeNext();
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -122,10 +122,16 @@
int ackMode = ((Integer)mi.getArguments()[1]).intValue();
boolean xa = ((Boolean)mi.getArguments()[2]).booleanValue();
+ boolean isCC = false;
+ if (mi.getArguments().length >= 4)
+ {
+ isCC = ((Boolean)mi.getArguments()[3]).booleanValue();
+ }
+
SessionState sessionState =
new SessionState(connectionState, sessionDelegate, transacted,
ackMode, xa, sessionDelegate.getDupsOKBatchSize(),
- connectionState.isEnableOrderingGroup(), connectionState.getDefaultOrderingGroupName());
+ connectionState.isEnableOrderingGroup(), connectionState.getDefaultOrderingGroupName(), isCC);
delegate.setState(sessionState);
return delegate;
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -44,6 +44,7 @@
import org.jboss.jms.wireformat.CloseRequest;
import org.jboss.jms.wireformat.ClosingRequest;
import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
+import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest2;
import org.jboss.jms.wireformat.ConnectionGetClientIDRequest;
import org.jboss.jms.wireformat.ConnectionGetIDBlockRequest;
import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
@@ -179,9 +180,7 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- public SessionDelegate createSessionDelegate(boolean transacted,
- int acknowledgmentMode,
- boolean isXA) throws JMSException
+ public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
{
RequestSupport req =
new ConnectionCreateSessionDelegateRequest(id, version, transacted,
@@ -190,7 +189,18 @@
return (SessionDelegate)doInvoke(client, req);
}
+ public SessionDelegate createSessionDelegate(boolean transacted,
+ int acknowledgmentMode,
+ boolean isXA, boolean isCC) throws JMSException
+ {
+ RequestSupport req =
+ new ConnectionCreateSessionDelegateRequest2(id, version, transacted,
+ acknowledgmentMode, isXA, isCC);
+ return (SessionDelegate)doInvoke(client, req);
+ }
+
+
public String getClientID() throws JMSException
{
RequestSupport req = new ConnectionGetClientIDRequest(id, version);
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -172,7 +172,7 @@
ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newDelegate.
createSessionDelegate(sessionState.isTransacted(),
sessionState.getAcknowledgeMode(),
- sessionState.isXA());
+ sessionState.isXA(), sessionState.isCC());
sessionDelegate.synchronizeWith(newSessionDelegate);
}
@@ -183,25 +183,6 @@
}
// Public ---------------------------------------------------------------------------------------
-
- /**
- *
- * Session Executors will store the ClassLoader on the threads they were created.
- * When a JMSSession is reused by JCA it can reuse it on different application context (i.e. different ClassLoaders)
- * This method should be called before the Session is stored on the JCA's cache
- * @throws InterruptedException
- *
- * */
- public void clearExecutors() throws InterruptedException
- {
- Iterator iterator = getChildren().iterator();
- while (iterator.hasNext())
- {
- SessionState state = (SessionState) iterator.next();
- state.clearExecutorThread();
- }
-
- }
public ResourceManager getResourceManager()
{
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -126,11 +126,13 @@
private String defaultOrderingGroupName;
+ private boolean isCC;
+
// Constructors ---------------------------------------------------------------------------------
-
+
public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
boolean transacted, int ackMode, boolean xa,
- int dupsOKBatchSize, boolean enableOrderingGroup, String defaultOrderingGroupName)
+ int dupsOKBatchSize, boolean enableOrderingGroup, String defaultOrderingGroupName, boolean isCC)
{
super(parent, (DelegateSupport)delegate);
@@ -140,6 +142,7 @@
this.acknowledgeMode = ackMode;
this.transacted = transacted;
this.xa = xa;
+ this.isCC = isCC;
this.dupsOKBatchSize = dupsOKBatchSize;
@@ -380,11 +383,6 @@
}
// Public ---------------------------------------------------------------------------------------
-
- public void clearExecutorThread() throws InterruptedException
- {
- this.executor.clearClassLoader();
- }
public void setTreatAsNonTransactedWhenNotEnlisted(boolean b)
{
@@ -439,6 +437,11 @@
return xa;
}
+ public boolean isCC()
+ {
+ return isCC;
+ }
+
public MessagingXAResource getXAResource()
{
return xaResource;
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -43,6 +43,10 @@
int acknowledgmentMode,
boolean isXA) throws JMSException;
+ SessionDelegate createSessionDelegate(boolean transacted,
+ int acknowledgmentMode,
+ boolean isXA, boolean isCC) throws JMSException;
+
String getClientID() throws JMSException;
void setClientID(String id) throws JMSException;
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -224,16 +224,21 @@
// ConnectionDelegate implementation ------------------------------------------------------------
+ public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
+ {
+ return createSessionDelegate(transacted, acknowledgmentMode, isXA, false);
+ }
+
public SessionDelegate createSessionDelegate(boolean transacted,
int acknowledgmentMode,
- boolean isXA)
+ boolean isXA, boolean isCC)
throws JMSException
{
try
{
log.trace(this + " creating " + (transacted ? "transacted" : "non transacted") +
" session, " + Util.acknowledgmentMode(acknowledgmentMode) + ", " +
- (isXA ? "XA": "non XA"));
+ (isXA ? "XA": "non XA") + ", " + (isCC ? "CC" : "non CC"));
if (closed)
{
@@ -248,6 +253,11 @@
//Note we only replicate transacted and client acknowledge sessions.
ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+
+ if (isCC)
+ {
+ ep.setCC();
+ }
synchronized (sessions)
{
@@ -364,6 +374,11 @@
public void close() throws JMSException
{
+ close(false);
+ }
+
+ public void close(boolean isFromFailure) throws JMSException
+ {
try
{
//reason for synchronization
@@ -392,7 +407,7 @@
{
ServerSessionEndpoint sess = (ServerSessionEndpoint)i.next();
- sess.localClose();
+ sess.localClose(isFromFailure);
}
sessions.clear();
@@ -522,12 +537,14 @@
// for 2pc rollback - we just don't cancel any messages back to the channel; this is
// driven from the client side.
+ // if the transaction rollback comes from recovery, we need to cancel the messages
+ // see JBMESSAGING-1786
Transaction tx = tr.getPreparedTx(request.getXid());
if (trace) { log.trace(this + " rolling back " + tx); }
- tx.rollback();
+ tx.rollback(request.getState().isRecovered());
}
if (trace) { log.trace(this + " processed transaction successfully"); }
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.server.endpoint;
+import java.util.List;
+
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -35,10 +37,13 @@
import org.jboss.jms.server.selector.Selector;
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
+import org.jboss.messaging.core.contract.Channel;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.contract.Receiver;
@@ -322,6 +327,26 @@
firstTime = false;
}
+ //now for a remote sucker, we need to update the messages status
+ if (remote)
+ {
+ PersistenceManager pm = sessionEndpoint.getPersistenceManager();
+ if (ref.getMessage().isReliable() && messageQueue.isRecoverable())
+ {
+ try
+ {
+ pm.updateMessageState(messageQueue.getChannelID(), ref, "S");
+ }
+ catch (Exception e)
+ {
+ //we need to stop the sucking process. the message should be re-delivered.
+ log.error("Failed to update state for message: " + ref, e);
+ return null;
+ }
+ }
+ delivery.setSucked(true);
+ }
+
try
{
sessionEndpoint.handleDelivery(delivery, this);
@@ -539,8 +564,13 @@
ServerPeer sp = sessionEndpoint.getConnectionEndpoint().getServerPeer();
- Queue queue = postOffice.getBindingForQueueName(queueName).queue;
+ Binding binding = postOffice.getBindingForQueueName(queueName);
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1801
+ if (binding == null) return;
+
+ Queue queue = binding.queue;
+
ManagedDestination mDest = sp.getDestinationManager().getDestination(destination.getName(), false);
if (!queue.isRecoverable())
@@ -647,6 +677,16 @@
sessionEndpoint.promptDelivery(messageQueue);
}
+ public long getChannelID()
+ {
+ return messageQueue.getChannelID();
+ }
+
+ public Channel getChannel()
+ {
+ return messageQueue;
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
@@ -74,11 +75,13 @@
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PersistenceManager.ReferenceInfo;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.core.impl.IDManager;
import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.SimpleDelivery;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionException;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -187,7 +190,14 @@
private long lastSequence = -1;
+ private Map<Long, Long> failureCanceledDels;
+ private AtomicBoolean isSuckerSession = new AtomicBoolean(false);
+
+ private boolean isCC = false;
+
+ private boolean markClose = false;
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -236,6 +246,8 @@
defaultRedeliveryDelay = sp.getDefaultRedeliveryDelay();
deliveries = new ConcurrentHashMap();
+
+ failureCanceledDels = new HashMap<Long, Long>();
}
// SessionDelegate implementation ---------------------------------------------------------------
@@ -333,6 +345,30 @@
public void close() throws JMSException
{
+ if (isCC)
+ {
+ markClose = true;
+ checkClose();
+ }
+ else
+ {
+ trueClose();
+ }
+ }
+
+ public void checkClose() throws JMSException
+ {
+ if (trace) {log.trace(this + " checking cc closing: " + markClose + " counter: " + deliveries.size()); }
+
+ if (markClose && deliveries.size() == 0)
+ {
+ if (trace) {log.trace(this + " closing CC session now.");}
+ trueClose();
+ }
+ }
+
+ public void trueClose() throws JMSException
+ {
try
{
localClose();
@@ -1133,9 +1169,14 @@
}
}
}
-
+
void localClose() throws Throwable
{
+ localClose(false);
+ }
+
+ void localClose(boolean isFromFailure) throws Throwable
+ {
if (closed)
{
@@ -1154,9 +1195,13 @@
consumersClone = new HashMap(consumers);
}
+ List<Channel> curChannels = new ArrayList<Channel>();
+
for( Iterator i = consumersClone.values().iterator(); i.hasNext(); )
{
- ((ServerConsumerEndpoint)i.next()).localClose();
+ ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
+ curChannels.add(consumer.getChannel());
+ consumer.localClose();
}
consumers.clear();
@@ -1213,6 +1258,33 @@
DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+ // https://jira.jboss.org/browse/JBMESSAGING-1828
+ if (rec == null)
+ {
+ continue;
+ }
+
+ //for a suck delivery, we need to update the state back to 'C'
+ if (rec.del.isSucked())
+ {
+ //we need to reverse the message (if still there). If reverse failed, we don't do
+ //cancel.
+ if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+ {
+ try
+ {
+ //now ask pm to do it.
+ pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+ }
+ catch (Exception e)
+ {
+ //if update failed, it must be a DB failure, we log the error and let others be canceled
+ log.error("Failed to update message " + rec.del.getReference() + " to state C", e);
+ continue;
+ }
+ }
+ }
+
/*
* https://jira.jboss.org/jira/browse/JBMESSAGING-1440
*/
@@ -1222,7 +1294,54 @@
}
channels.add(rec.del.getObserver());
+
+ if (isFromFailure)
+ {
+ failureCanceledDels.put(rec.deliveryID, rec.deliveryID);
+ }
}
+
+ if (isSuckerSession.get())
+ {
+ //here we handle rare cases where a sucker acked a message but then crashed.
+ //so the message won't be updated to target channel and also the session already
+ //forgets it. We take this chance here
+ //to load those messages into channels and redeliver
+ for (Channel ch : curChannels)
+ {
+ List<ReferenceInfo> refs = pm.claimMessagesInSuck(ch.getChannelID());
+
+ if (refs.size() > 0)
+ {
+ List<Long> mids = new ArrayList<Long>();
+ Map<Long, ReferenceInfo> refInfoMap = new HashMap<Long, ReferenceInfo>();
+
+ for (ReferenceInfo refInfo : refs)
+ {
+ mids.add(refInfo.getMessageId());
+ refInfoMap.put(refInfo.getMessageId(), refInfo);
+ }
+
+ List messages = pm.getMessages(mids);
+
+ iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message m = (Message)iter.next();
+ MessageReference mref = ms.reference(m);
+
+ ReferenceInfo mInfo = refInfoMap.get(m.getMessageID());
+ mref.setDeliveryCount(mInfo.getDeliveryCount());
+ mref.setScheduledDeliveryTime(mInfo.getScheduledDelivery());
+
+ Delivery del = new SimpleDelivery(ch, mref, true, true);
+ del.cancel();
+ }
+ channels.add(ch);
+ }
+ }
+ }
promptDelivery(channels);
@@ -1600,19 +1719,40 @@
synchronized (deliveries)
{
rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ if (rec == null)
+ {
+ //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
+ //and has failed over since recoverDeliveries won't have been called
+ if (trace)
+ {
+ log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
+ }
+ return null;
+ }
+ //now we check for suckers
+ if (rec.del.isSucked())
+ {
+ //we need to reverse the message (if still there). If reverse failed, we don't do
+ //cancel.
+ if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+ {
+ try
+ {
+ //now ask pm to do it.
+ pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+ }
+ catch (Exception e)
+ {
+ if (trace)
+ {
+ log.trace("Failed to update message " + rec.del.getReference() + " to state C");
+ }
+ return null;
+ }
+ }
+ }
}
- if (rec == null)
- {
- //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
- //and has failed over since recoverDeliveries won't have been called
- if (trace)
- {
- log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
- }
- return null;
- }
-
//Note we check the flag *and* evaluate again, this is because the server and client clocks may
//be out of synch and don't want to send back to the client a message it thought it has sent to
//the expiry queue
@@ -1666,6 +1806,11 @@
//Need to send a message to the replicant to remove the id
postOffice.sendReplicateAckMessage(rec.queueName, del.getReference().getMessage().getMessageID());
+
+ if (isCC)
+ {
+ checkClose();
+ }
return rec.del;
}
@@ -1762,15 +1907,29 @@
synchronized (deliveries)
{
rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ if (rec == null)
+ {
+ //This can happen in one of the two cases:
+ //
+ //1. If an ack comes in after failover, or
+ //2. The session is closed due to server side connection failure notification processing.
+ //When a connection failure is detected at the server end, it will close all related server side
+ //sessions. As part of closing, any un-acked message will be canceled.
+ //if a normal client side ack comes in just after the session is thus being closed, and this ack
+ //has just been canceled, the client side ack will end up here.
+ //
+ //We treat the cases differently. For case 1, we can safely ignore it.
+ //For case 2, we must throw an exception to indicating that the ack failed and the message will be re-delivered.
+ if (failureCanceledDels.remove(ack.getDeliveryID()) != null)
+ {
+ //ack should fail
+ throw new JMSException("Message already canceled before this ack " + ack + " and the message will be redelivered.");
+ }
+ log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
+ return false;
+ }
}
- if (rec == null)
- {
- //This can happen if an ack comes in after failover
- log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
- return false;
- }
-
ServerConsumerEndpoint consumer = rec.getConsumer();
if (consumer != null && consumer.isRemote())
@@ -1791,6 +1950,11 @@
}
if (trace) { log.trace(this + " acknowledged delivery " + ack); }
+
+ if (isCC)
+ {
+ checkClose();
+ }
return true;
}
@@ -1829,12 +1993,12 @@
JBossDestination dest = new JBossQueue(queueName);
- //We don't care about redelivery delays and number of attempts for a direct consumer
-
+ //We don't care about redelivery delays for a direct consumer
+ //We do care about number of attempts, see JBMESSAGING-1774
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, binding.queue,
binding.queue.getName(), this, selectorString, false,
- dest, null, null, 0, -1, true, false, prefetchSize);
+ dest, null, null, 0, 1, true, false, prefetchSize);
ConsumerAdvised advised;
@@ -1856,6 +2020,8 @@
}
log.trace(this + " created and registered " + ep);
+
+ isSuckerSession.set(true);
return stub;
}
@@ -2392,14 +2558,66 @@
throw new TransactionException("Failed to handle send ack", e);
}
}
+
+ if (isCC && del != null)
+ {
+ try
+ {
+ checkClose();
+ }
+ catch (JMSException e)
+ {
+ //we don't need to do anything here.
+ log.warn("Exception closing a CC session " + this);
+ }
+ }
}
}
public void afterRollback(boolean onePhase) throws TransactionException
{
- //One phase rollbacks never hit the server - they are dealt with locally only
- //so this would only ever be executed for a two phase rollback.
+ if (log.isTraceEnabled()) { log.trace(this + " afterRollback, onePhase: " + onePhase); }
+ //One phase rollbacks usually don't hit the server - they are dealt with locally only
+ //but if one-phase commit fails, we need to rollback the delivery
+ if (onePhase)
+ {
+ // Remove the deliveries from the delivery map.
+ Iterator iter = delList.iterator();
+ while (iter.hasNext())
+ {
+ Long deliveryId = (Long)iter.next();
+ DeliveryRecord del = (DeliveryRecord)deliveries.remove(deliveryId);
+
+ if (del != null && del.replicating)
+ {
+ //TODO - we could batch this in one message
+ try
+ {
+ postOffice.sendReplicateAckMessage(del.queueName, del.del.getReference().getMessage().getMessageID());
+ }
+ catch (Exception e)
+ {
+ throw new TransactionException("Failed to handle send ack", e);
+ }
+ }
+
+ if (isCC && del != null)
+ {
+ try
+ {
+ checkClose();
+ }
+ catch (JMSException e)
+ {
+ //we don't need to do anything here.
+ log.warn("Exception closing a CC session " + this);
+ }
+ }
+ }
+ }
+
+ //for a two phase rollback.
//We don't do anything since cancellation is driven from the client.
}
@@ -2409,4 +2627,19 @@
}
}
+ public PersistenceManager getPersistenceManager()
+ {
+ return pm;
+ }
+
+ public void setCC()
+ {
+ isCC = true;
+ }
+
+ public boolean isCC()
+ {
+ return isCC;
+ }
+
}
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -68,11 +68,16 @@
return endpoint.closing(sequence);
}
+ public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
+ {
+ return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA);
+ }
+
public SessionDelegate createSessionDelegate(boolean transacted,
int acknowledgmentMode,
- boolean isXA) throws JMSException
+ boolean isXA, boolean isCC) throws JMSException
{
- return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA);
+ return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA, isCC);
}
public String getClientID() throws JMSException
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -63,6 +63,8 @@
private List sessionStatesList;
private boolean clientSide;
+
+ private boolean recovered;
// Static --------------------------------------------------------
@@ -70,7 +72,13 @@
public ClientTransaction()
{
+ this(false);
+ }
+
+ public ClientTransaction(boolean isRecovered)
+ {
clientSide = true;
+ recovered = isRecovered;
}
// Public --------------------------------------------------------
@@ -215,6 +223,17 @@
}
}
+ public boolean isRecovered()
+ {
+ return recovered;
+ }
+
+ public void setRecovered(boolean b)
+ {
+ recovered = b;
+ }
+
+
// Streamable implementation ---------------------------------
public void write(DataOutputStream out) throws Exception
@@ -272,6 +291,7 @@
out.writeLong(Long.MIN_VALUE);
}
}
+ out.writeBoolean(recovered);
}
@@ -315,6 +335,8 @@
sessionState.addAck(new DefaultAck(l));
}
}
+
+ recovered = in.readBoolean();
}
// Protected -----------------------------------------------------
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -91,6 +91,7 @@
public static final int REQ_CONNECTION_STOP = 205;
public static final int REQ_CONNECTION_SENDTRANSACTION = 206;
public static final int REQ_CONNECTION_GETPREPAREDTRANSACTIONS = 207;
+ public static final int REQ_CONNECTION_CREATESESSIONDELEGATE2 = 208;
// Session
// -------
@@ -226,6 +227,9 @@
case REQ_CONNECTION_CREATESESSIONDELEGATE:
packet = new ConnectionCreateSessionDelegateRequest();
break;
+ case REQ_CONNECTION_CREATESESSIONDELEGATE2:
+ packet = new ConnectionCreateSessionDelegateRequest2();
+ break;
case REQ_CONNECTION_GETCLIENTID:
packet = new ConnectionGetClientIDRequest();
break;
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -53,4 +53,8 @@
* Mark if this delivery is with a prepared XA transaction.
*/
boolean isXAPrepared();
+
+ boolean isSucked();
+
+ void setSucked(boolean isSucked);
}
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -87,6 +87,24 @@
void addTransaction(Transaction tx);
+ //drop all messages that belong to a channel.
+ void dropChannelMessages(long channelID) throws Exception;
+
+ //merge messages from one channel to another.
+ void mergeChannelMessage(long fromID, long toID) throws Exception;
+
+ //set if supporting storing transaction creation time.
+ void setSupportsTxAge(boolean supportsTxAge);
+
+ //if supports transaction creation time
+ boolean supportsTxAge();
+
+ //update the status of the message
+ void updateMessageState(long channelID, MessageReference ref, String state) throws Exception;
+
+ //update messages state to 'C' of the channel whose state is 'S', and return their messages ids
+ List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception;
+
// Interface value classes ----------------------------------------------------------------------
class MessageChannelPair
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -36,7 +36,7 @@
import org.jboss.messaging.util.StreamUtils;
import org.jboss.messaging.util.Util;
-import javax.jms.Session;
+import javax.jms.JMSException;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.xa.Xid;
@@ -53,6 +53,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
* @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:hgao at jboss.org">Howard Gao</a>
*
* @version <tt>1.1</tt>
*
@@ -102,8 +103,9 @@
private int idCacheCounter = 0;
private final int idCacheSize;
+
+ private boolean supportsTxAge;
-
// Constructors --------------------------------------------------
public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -322,6 +324,10 @@
ResultSet rs = null;
PreparedTxInfo txInfo = null;
TransactionWrapper wrap = new TransactionWrapper();
+
+ PreparedStatement exst = null;
+ ResultSet exrs = null;
+ Map<Long, Long> txTimes = new HashMap<Long, Long>();
try
{
@@ -329,6 +335,27 @@
conn = ds.getConnection();
+ if (supportsTxAge)
+ {
+ try
+ {
+ exst = conn.prepareStatement(getSQLStatement("SELECT_TRANSACTION_START_TIME_EXTRA"));
+ exrs = exst.executeQuery();
+
+ while (exrs.next())
+ {
+ long txId = exrs.getLong(1);
+ long txTime = exrs.getLong(2);
+ txTimes.put(txId, txTime);
+ }
+ }
+ finally
+ {
+ closeResultSet(exrs);
+ closeStatement(exst);
+ }
+ }
+
st = conn
.prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
@@ -350,7 +377,7 @@
Xid xid = new MessagingXid(branchQual, formatId, globalTxId);
// create a tx info object with the result set detailsdetails
- txInfo = new PreparedTxInfo(txId, xid);
+ txInfo = new PreparedTxInfo(txId, xid, txTimes.get(txId));
transactions.add(txInfo);
}
@@ -1134,6 +1161,169 @@
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.contract.PersistenceManager#dropChannelMessages(long)
+ */
+ public void dropChannelMessages(final long channelID) throws Exception
+ {
+ class ChannelDropRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ PreparedStatement ps2 = null;
+ ResultSet rs = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+ ps.setLong(1, channelID);
+ rs = ps.executeQuery();
+ int rows;
+
+ ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+ while (rs.next())
+ {
+ long mid = rs.getLong(1);
+ ps2.setLong(1, mid);
+ ps2.executeUpdate();
+ }
+ ps2.close();
+
+ ps.close();
+
+ ps = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+ ps.setLong(1, channelID);
+ rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update page ord updated " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return null;
+ }
+ }
+
+ new ChannelDropRunner().executeWithRetry();
+ }
+
+ /* (non-Javadoc)
+ * load messages from the channel (fromID) in the DB and
+ * add the messages to the channel (toID)
+ * @see org.jboss.messaging.core.contract.PersistenceManager#mergeChannelMessage(long, long)
+ */
+ public void mergeChannelMessage(final long fromID, final long toID) throws Exception
+ {
+
+ if (fromID == toID) { throw new IllegalArgumentException(
+ "Cannot merge queues - they have the same channel id!!"); }
+
+ class ChannelMergeRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ PreparedStatement ps2 = null;
+
+ try
+ {
+ //first get max page order of toID channel
+ ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+ ps.setLong(1, toID);
+
+ rs = ps.executeQuery();
+
+ rs.next();
+
+ Long maxOrdering = new Long(rs.getLong(2));
+
+ long pageCount = 0;
+
+ if (rs.wasNull())
+ {
+ maxOrdering = null;
+ }
+ else
+ {
+ //If maxOrdering is not null, update the page order
+ pageCount = maxOrdering + 1;
+ }
+
+ rs.close();
+
+ ps.close();
+
+ if (pageCount > 0)
+ {
+ //update paging
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+
+ ps2 = conn
+ .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+ ps.setLong(1, fromID);
+
+ rs = ps.executeQuery();
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+
+ ps2.setLong(1, pageCount);
+
+ ps2.setLong(2, msgId);
+
+ ps2.setLong(3, fromID);
+
+ int rows = ps2.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update page ord updated " + rows + " rows");
+ }
+
+ pageCount++;
+ }
+ ps2.close();
+ ps.close();
+ }
+
+ //update channel id
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+ ps.setLong(1, toID);
+
+ ps.setLong(2, fromID);
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update channel id updated " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return null;
+ }
+ }
+
+ new ChannelMergeRunner().executeWithRetry();
+ }
+
public InitialLoadInfo mergeAndLoad(final long fromChannelID,
final long toChannelID, final int numberToLoad,
final long firstPagingOrder, final long nextPagingOrder)
@@ -1448,6 +1638,12 @@
{
log.trace("Updated " + rows + " rows");
}
+
+ if (rows == 0)
+ {
+ //no message updated, should be canceled back already
+ throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+ }
return null;
}
@@ -2188,6 +2384,36 @@
}
closeStatement(ps);
}
+
+ if (supportsTxAge)
+ {
+ try
+ {
+ statement = getSQLStatement("INSERT_TRANSACTION_EXTRA");
+
+ ps = conn.prepareStatement(statement);
+
+ ps.setLong(1, tx.getId());
+
+ ps.setLong(2, tx.getCreationTime());
+
+ rows = ps.executeUpdate();
+ }
+ finally
+ {
+ if (trace)
+ {
+ String s = JDBCUtil.statementToString(statement,
+ new Integer(nodeID),
+ new Long(tx.getId()),
+ "<byte-array>",
+ new Integer(formatID),
+ "<byte-array>");
+ log.trace(s + (rows == -1 ? " failed!" : " inserted " + rows + " row(s)"));
+ }
+ closeStatement(ps);
+ }
+ }
}
protected void removeTXRecord(Connection conn, Transaction tx)
@@ -2222,6 +2448,30 @@
{
closeStatement(ps);
}
+
+ if (supportsTxAge)
+ {
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("DELETE_TRANSACTION_EXTRA"));
+
+ ps.setLong(1, tx.getId());
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace(JDBCUtil.statementToString(
+ getSQLStatement("DELETE_TRANSACTION_EXTRA"), new Integer(nodeID),
+ new Long(tx.getId()))
+ + " removed " + rows + " row(s)");
+ }
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ }
}
protected void addReference(long channelID, MessageReference ref,
@@ -2525,8 +2775,7 @@
if (i == null) { return null; }
- is = new BufferedInputStream(rs.getBinaryStream(columnIndex),
- BUFFER_SIZE);
+ is = new BufferedInputStream(i, BUFFER_SIZE);
os = new ByteArrayOutputStream(BUFFER_SIZE);
@@ -2597,6 +2846,11 @@
"CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))");
// Id cache
map.put("CREATE_ID_CACHE", "CREATE TABLE JBM_ID_CACHE (NODE_ID INTEGER, CNTR INTEGER, JBM_ID VARCHAR(255), PRIMARY KEY(NODE_ID, CNTR))");
+
+ // Transaction Extra
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1772
+ map.put("CREATE_TRANSACTION_EXTRA", "CREATE TABLE JBM_TX_EX (TRANSACTION_ID BIGINT, START_TIME BIGINT, PRIMARY KEY (TRANSACTION_ID))");
+
return map;
}
@@ -2668,6 +2922,10 @@
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("DELETE_MESSAGE",
"DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+ map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
+ map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?");
+
+
// Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "
@@ -2691,6 +2949,16 @@
// Other
map.put("SELECT_ALL_CHANNELS",
"SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF");
+
+ //Transaction Extra
+ map.put("SELECT_TRANSACTION_START_TIME_EXTRA", "SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX");
+ map.put("INSERT_TRANSACTION_EXTRA", "INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)");
+ map.put("DELETE_TRANSACTION_EXTRA", "DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?");
+
+ //sucker use
+ map.put("UPDATE_MESSAGE_STATE", "UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?");
+ map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'");
+ map.put("LOAD_REFS_IN_SUCK", "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
return map;
}
@@ -3043,4 +3311,107 @@
}
}
+ public void setSupportsTxAge(boolean supportsTxAge)
+ {
+ this.supportsTxAge = supportsTxAge;
+ }
+
+ public boolean supportsTxAge()
+ {
+ return supportsTxAge;
+ }
+
+ public void updateMessageState(final long channelID, final MessageReference ref, final String c) throws Exception
+ {
+ class UpdateMessageStateRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psReference = null;
+
+ try
+ {
+ psReference = conn
+ .prepareStatement(getSQLStatement("UPDATE_MESSAGE_STATE"));
+
+ psReference.setString(1, c);
+
+ psReference.setLong(2, ref.getMessage().getMessageID());
+
+ psReference.setLong(3, channelID);
+
+ int rows = psReference.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Updated " + rows + " rows");
+ }
+
+ if (rows != 1)
+ {
+ throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ }
+ }
+ }
+
+ new UpdateMessageStateRunner().executeWithRetry();
+ }
+
+ public List<ReferenceInfo> claimMessagesInSuck(final long channelID) throws Exception
+ {
+ final List<ReferenceInfo> msgIDs = new ArrayList<ReferenceInfo>();
+
+ class MessageClaimRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ PreparedStatement ps2 = null;
+ ResultSet rs = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS_IN_SUCK"));
+ ps.setLong(1, channelID);
+ rs = ps.executeQuery();
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+ int deliveryCount = rs.getInt(2);
+ long sched = rs.getLong(3);
+
+ ps2 = conn.prepareStatement(getSQLStatement("CLAIM_MESSAGE_IN_SUCK"));
+ ps2.setLong(1, channelID);
+ ps2.setLong(2, msgId);
+ int rows = ps2.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Message in suck claimed " + rows + " rows for message " + msgId);
+ }
+
+ msgIDs.add(new ReferenceInfo(msgId, deliveryCount, sched));
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return msgIDs;
+ }
+ }
+ new MessageClaimRunner().executeWithRetry();
+ return msgIDs;
+ }
+
}
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -116,6 +116,15 @@
// NOOP
}
+ public void setSupportsTxAge(boolean supportsTxAge)
+ {
+ }
+
+ public boolean supportsTxAge()
+ {
+ return false;
+ }
+
public void addTransaction(Transaction tx)
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -211,6 +220,23 @@
return timeMark;
}
+ public void dropChannelMessages(long channelID) throws Exception
+ {
+ }
+
+ public void mergeChannelMessage(long fromID, long toID) throws Exception
+ {
+ }
+
+ public void updateMessageState(long channelID, MessageReference ref, String state) throws Exception
+ {
+ }
+
+ public List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception
+ {
+ return Collections.emptyList();
+ }
+
}
class IDCounter
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -51,6 +51,7 @@
private MessageReference reference;
private boolean recovered;
private Transaction tx;
+ private boolean isSucked;
private boolean trace = log.isTraceEnabled();
@@ -136,6 +137,16 @@
return false;
}
+ public void setSucked(boolean sucked)
+ {
+ isSucked = sucked;
+ }
+
+ public boolean isSucked()
+ {
+ return isSucked;
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -36,6 +36,8 @@
private long txId;
private Xid xid = null;
+
+ private long creationTime = Long.MIN_VALUE;
// Constructors ------------------------------------------------------------
@@ -45,6 +47,14 @@
setXid(xid);
}
+ public PreparedTxInfo(long txId, Xid xid, Long ct) {
+ setTxId(txId);
+ setXid(xid);
+ if (ct != null)
+ {
+ creationTime = ct;
+ }
+ }
// Public ------------------------------------------------------------------
@@ -63,6 +73,11 @@
public void setXid(Xid xid) {
this.xid = xid;
}
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
// Object overrides --------------------------------------------------------
Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java 2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java 2011-05-16 08:44:04 UTC (rev 8301)
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
@@ -63,6 +64,8 @@
private Map callbackMap;
private boolean recoveredFromStorage;
+
+ private long creationTime;
/**
* If this is a XA transaction, when a commit is executed the transaction has to be removed from the transaction repository.
@@ -122,6 +125,7 @@
state = STATE_ACTIVE;
callbacks = new ArrayList();
callbackMap = new HashMap();
+ creationTime = System.currentTimeMillis();
}
Transaction(long id, Xid xid, TransactionRepository tr)
@@ -130,6 +134,12 @@
this.xid = xid;
this.repository = tr;
}
+
+ Transaction(long id, Xid xid, TransactionRepository tr, long ct)
+ {
+ this(id, xid, tr);
+ creationTime = ct;
+ }
// Public --------------------------------------------------------
@@ -194,24 +204,51 @@
boolean onePhase = state != STATE_PREPARED;
- if (firstCallback != null)
+ Iterator iter = null;
+
+ try
{
- firstCallback.beforeCommit(onePhase);
+
+ if (firstCallback != null)
+ {
+ firstCallback.beforeCommit(onePhase);
+ }
+
+ iter = callbacks.iterator();
+
+ while (iter.hasNext())
+ {
+ TxCallback callback = (TxCallback)iter.next();
+
+ callback.beforeCommit(onePhase);
+ }
+
+ state = STATE_COMMITTED;
+
+ if (trace)
+ {
+ log.trace(this + " committed");
+ }
+
}
-
- Iterator iter = callbacks.iterator();
-
- while (iter.hasNext())
+ catch (Exception e)
{
- TxCallback callback = (TxCallback)iter.next();
-
- callback.beforeCommit(onePhase);
+ // for one-phase commit, we need to rollback the message.
+ if (onePhase)
+ {
+ if (trace)
+ {
+ log.trace(this + " one-phase commit results in rollback.");
+ }
+
+ rollback();
+
+ throw new XAException(XAException.XA_RBOTHER);
+ }
+ // for 2-pc commit, we throw the exception
+ throw e;
}
- state = STATE_COMMITTED;
-
- if (trace) { log.trace(this + " committed"); }
-
iter = callbacks.iterator();
if (trace) { log.trace(this + " executing after commit hooks"); }
@@ -288,8 +325,13 @@
if (trace) { log.trace(this + " prepare process complete"); }
}
- public synchronized void rollback() throws Exception
+ public void rollback() throws Exception
{
+ rollback(false);
+ }
+
+ public synchronized void rollback(boolean recovered) throws Exception
+ {
if (state == STATE_COMMITTED)
{
throw new TransactionException("Transaction already committed, cannot rollback");
@@ -336,7 +378,15 @@
for(Iterator i = callbacks.iterator(); i.hasNext();)
{
TxCallback callback = (TxCallback)i.next();
- callback.afterRollback(onePhase);
+
+ if (callback instanceof TxCallbackEx)
+ {
+ ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered || onePhase);
+ }
+ else
+ {
+ callback.afterRollback(onePhase);
+ }
}
callbacks = null;
@@ -385,6 +435,20 @@
{
this.state = state;
}
+
+ public long getAge()
+ {
+ if (creationTime == Long.MIN_VALUE)
+ {
+ return creationTime;
+ }
+ return System.currentTimeMillis() - creationTime;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
public String toString()
{
More information about the jboss-cvs-commits
mailing list