[jboss-cvs] JBoss Messaging SVN: r8301 - in branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss: jms/client/delegate and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon May 16 04:44:05 EDT 2011


Author: bershath27
Date: 2011-05-16 04:44:04 -0400 (Mon, 16 May 2011)
New Revision: 8301

Modified:
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java
   branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
Log:
JBPAPP-6386



Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -150,8 +150,6 @@
    public Object handleStop(Invocation invocation) throws Throwable
    {
       ConnectionState currentState = getConnectionState(invocation);
-      // If the session is being sent to the Cache, we need to clear the Thread because of the ContextClassLoader
-      currentState.clearExecutors();
       currentState.setStarted(false);
       currentState.setJustCreated(false);
       return invocation.invokeNext();

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -122,10 +122,16 @@
       int ackMode = ((Integer)mi.getArguments()[1]).intValue();
       boolean xa = ((Boolean)mi.getArguments()[2]).booleanValue();
 
+      boolean isCC = false;
+      if (mi.getArguments().length >= 4)
+      {
+         isCC = ((Boolean)mi.getArguments()[3]).booleanValue();
+      }
+
       SessionState sessionState =
          new SessionState(connectionState, sessionDelegate, transacted,
                           ackMode, xa, sessionDelegate.getDupsOKBatchSize(), 
-                          connectionState.isEnableOrderingGroup(), connectionState.getDefaultOrderingGroupName());
+                          connectionState.isEnableOrderingGroup(), connectionState.getDefaultOrderingGroupName(), isCC);
 
       delegate.setState(sessionState);
       return delegate;

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -44,6 +44,7 @@
 import org.jboss.jms.wireformat.CloseRequest;
 import org.jboss.jms.wireformat.ClosingRequest;
 import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
+import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest2;
 import org.jboss.jms.wireformat.ConnectionGetClientIDRequest;
 import org.jboss.jms.wireformat.ConnectionGetIDBlockRequest;
 import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
@@ -179,9 +180,7 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   public SessionDelegate createSessionDelegate(boolean transacted,
-                                                int acknowledgmentMode,
-                                                boolean isXA) throws JMSException
+   public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
    {
       RequestSupport req =
          new ConnectionCreateSessionDelegateRequest(id, version, transacted,
@@ -190,7 +189,18 @@
       return (SessionDelegate)doInvoke(client, req);
    }
 
+   public SessionDelegate createSessionDelegate(boolean transacted,
+                                                int acknowledgmentMode,
+                                                boolean isXA, boolean isCC) throws JMSException
+   {
+      RequestSupport req =
+         new ConnectionCreateSessionDelegateRequest2(id, version, transacted,
+                                                    acknowledgmentMode, isXA, isCC);
 
+      return (SessionDelegate)doInvoke(client, req);
+   }
+
+
    public String getClientID() throws JMSException
    {
       RequestSupport req = new ConnectionGetClientIDRequest(id, version);

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/ConnectionState.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -172,7 +172,7 @@
          ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newDelegate.
             createSessionDelegate(sessionState.isTransacted(),
                                   sessionState.getAcknowledgeMode(),
-                                  sessionState.isXA());
+                                  sessionState.isXA(), sessionState.isCC());
 
          sessionDelegate.synchronizeWith(newSessionDelegate);
       }
@@ -183,25 +183,6 @@
    }
 
    // Public ---------------------------------------------------------------------------------------
-
-   /** 
-    * 
-    * Session Executors will store the ClassLoader on the threads they were created.
-    * When a JMSSession is reused by JCA it can reuse it on different application context (i.e. different ClassLoaders)
-    * This method should be called before the Session is stored on the JCA's cache
-    * @throws InterruptedException 
-    * 
-    * */
-   public void clearExecutors() throws InterruptedException
-   {
-      Iterator iterator = getChildren().iterator();
-      while (iterator.hasNext())
-      {
-         SessionState state = (SessionState) iterator.next();
-         state.clearExecutorThread();
-      }
-      
-   }
    
    public ResourceManager getResourceManager()
    {

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/client/state/SessionState.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -126,11 +126,13 @@
    
    private String defaultOrderingGroupName;
    
+   private boolean isCC;
+   
    // Constructors ---------------------------------------------------------------------------------
-
+   
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
                        boolean transacted, int ackMode, boolean xa,
-                       int dupsOKBatchSize, boolean enableOrderingGroup, String defaultOrderingGroupName)
+                       int dupsOKBatchSize, boolean enableOrderingGroup, String defaultOrderingGroupName, boolean isCC)
    {
       super(parent, (DelegateSupport)delegate);
 
@@ -140,6 +142,7 @@
       this.acknowledgeMode = ackMode;
       this.transacted = transacted;
       this.xa = xa;
+      this.isCC = isCC;
       
       this.dupsOKBatchSize = dupsOKBatchSize;
 
@@ -380,11 +383,6 @@
    }
    
    // Public ---------------------------------------------------------------------------------------
-
-   public void clearExecutorThread() throws InterruptedException
-   {
-      this.executor.clearClassLoader();
-   }
    
    public void setTreatAsNonTransactedWhenNotEnlisted(boolean b)
    {
@@ -439,6 +437,11 @@
       return xa;
    }
 
+   public boolean isCC()
+   {
+      return isCC;
+   }
+   
    public MessagingXAResource getXAResource()
    {
       return xaResource;

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -43,6 +43,10 @@
                                          int acknowledgmentMode,
                                          boolean isXA) throws JMSException;
 
+   SessionDelegate createSessionDelegate(boolean transacted,
+                                         int acknowledgmentMode,
+                                         boolean isXA, boolean isCC) throws JMSException;
+
    String getClientID() throws JMSException;
 
    void setClientID(String id) throws JMSException;

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -224,16 +224,21 @@
 
    // ConnectionDelegate implementation ------------------------------------------------------------
 
+   public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
+   {
+      return createSessionDelegate(transacted, acknowledgmentMode, isXA, false);
+   }
+
    public SessionDelegate createSessionDelegate(boolean transacted,
                                                 int acknowledgmentMode,
-                                                boolean isXA)
+                                                boolean isXA, boolean isCC)
       throws JMSException
    {
       try
       {
          log.trace(this + " creating " + (transacted ? "transacted" : "non transacted") +
             " session, " + Util.acknowledgmentMode(acknowledgmentMode) + ", " +
-            (isXA ? "XA": "non XA"));
+            (isXA ? "XA": "non XA") + ", " + (isCC ? "CC" : "non CC"));
 
          if (closed)
          {
@@ -248,6 +253,11 @@
          //Note we only replicate transacted and client acknowledge sessions.
          ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
          		                     transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+         
+         if (isCC)
+         {
+            ep.setCC();
+         }
 
          synchronized (sessions)
          {
@@ -364,6 +374,11 @@
 
    public void close() throws JMSException
    {
+      close(false);
+   }
+
+   public void close(boolean isFromFailure) throws JMSException
+   {
       try
       {
          //reason for synchronization
@@ -392,7 +407,7 @@
          {
             ServerSessionEndpoint sess = (ServerSessionEndpoint)i.next();
 
-            sess.localClose();
+            sess.localClose(isFromFailure);
          }
 
          sessions.clear();
@@ -522,12 +537,14 @@
 
             // for 2pc rollback - we just don't cancel any messages back to the channel; this is
             // driven from the client side.
+            // if the transaction rollback comes from recovery, we need to cancel the messages
+            // see JBMESSAGING-1786
 
             Transaction tx =  tr.getPreparedTx(request.getXid());
 
             if (trace) { log.trace(this + " rolling back " + tx); }
 
-            tx.rollback();
+            tx.rollback(request.getState().isRecovered());
          }
 
          if (trace) { log.trace(this + " processed transaction successfully"); }

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -21,6 +21,8 @@
  */
 package org.jboss.jms.server.endpoint;
 
+import java.util.List;
+
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -35,10 +37,13 @@
 import org.jboss.jms.server.selector.Selector;
 import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
+import org.jboss.messaging.core.contract.Channel;
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.DeliveryObserver;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
 import org.jboss.messaging.core.contract.Receiver;
@@ -322,6 +327,26 @@
             firstTime = false;
          }         
          
+         //now for a remote sucker, we need to update the messages status
+         if (remote)
+         {
+            PersistenceManager pm = sessionEndpoint.getPersistenceManager();
+            if (ref.getMessage().isReliable() && messageQueue.isRecoverable())
+            {
+               try
+               {
+                  pm.updateMessageState(messageQueue.getChannelID(), ref, "S");
+               }
+               catch (Exception e)
+               {
+                  //we need to stop the sucking process. the message should be re-delivered.
+                  log.error("Failed to update state for message: " + ref, e);
+                  return null;
+               }
+            }
+            delivery.setSucked(true);
+         }
+         
          try
          {
          	sessionEndpoint.handleDelivery(delivery, this);
@@ -539,8 +564,13 @@
                   
          ServerPeer sp = sessionEndpoint.getConnectionEndpoint().getServerPeer();
          
-         Queue queue = postOffice.getBindingForQueueName(queueName).queue;        
+         Binding binding = postOffice.getBindingForQueueName(queueName);
          
+         //https://jira.jboss.org/jira/browse/JBMESSAGING-1801
+         if (binding == null) return;
+         
+         Queue queue = binding.queue;        
+         
          ManagedDestination mDest = sp.getDestinationManager().getDestination(destination.getName(), false);
          
          if (!queue.isRecoverable())
@@ -647,6 +677,16 @@
       sessionEndpoint.promptDelivery(messageQueue);
    }
 
+   public long getChannelID()
+   {
+      return messageQueue.getChannelID();
+   }
+
+   public Channel getChannel()
+   {
+      return messageQueue;
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -33,6 +33,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
@@ -74,11 +75,13 @@
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PersistenceManager.ReferenceInfo;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
 import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.core.impl.IDManager;
 import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.SimpleDelivery;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionException;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -187,7 +190,14 @@
 
    private long lastSequence = -1;
 
+   private Map<Long, Long> failureCanceledDels;
 
+   private AtomicBoolean isSuckerSession = new AtomicBoolean(false);
+   
+   private boolean isCC = false;
+   
+   private boolean markClose = false;
+
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -236,6 +246,8 @@
       defaultRedeliveryDelay = sp.getDefaultRedeliveryDelay();
 
       deliveries = new ConcurrentHashMap();
+      
+      failureCanceledDels = new HashMap<Long, Long>();
    }
 
    // SessionDelegate implementation ---------------------------------------------------------------
@@ -333,6 +345,30 @@
 
    public void close() throws JMSException
    {
+      if (isCC)
+      {
+         markClose = true;
+         checkClose();
+      }
+      else
+      {
+         trueClose();
+      }
+   }
+   
+   public void checkClose() throws JMSException
+   {
+      if (trace) {log.trace(this + " checking cc closing: " + markClose + " counter: " + deliveries.size()); }
+      
+      if (markClose && deliveries.size() == 0)
+      {
+         if (trace) {log.trace(this + " closing CC session now.");}
+         trueClose();
+      }
+   }
+   
+   public void trueClose() throws JMSException
+   {
       try
       {
          localClose();
@@ -1133,9 +1169,14 @@
          }
       }
    }
-
+   
    void localClose() throws Throwable
    {
+      localClose(false);
+   }
+
+   void localClose(boolean isFromFailure) throws Throwable
+   {
            
       if (closed)
       {
@@ -1154,9 +1195,13 @@
          consumersClone = new HashMap(consumers);
       }
 
+      List<Channel> curChannels = new ArrayList<Channel>();
+      
       for( Iterator i = consumersClone.values().iterator(); i.hasNext(); )
       {
-         ((ServerConsumerEndpoint)i.next()).localClose();
+         ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
+         curChannels.add(consumer.getChannel());
+         consumer.localClose();
       }
 
       consumers.clear();
@@ -1213,6 +1258,33 @@
 
             DeliveryRecord rec = (DeliveryRecord)entry.getValue();
 
+            // https://jira.jboss.org/browse/JBMESSAGING-1828
+            if (rec == null)
+            {
+               continue;
+            }
+
+            //for a suck delivery, we need to update the state back to 'C'
+            if (rec.del.isSucked())
+            {
+               //we need to reverse the message (if still there). If reverse failed, we don't do 
+               //cancel.
+               if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+               {
+                  try
+                  {
+                     //now ask pm to do it.
+                     pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+                  }
+                  catch (Exception e)
+                  {
+                     //if update failed, it must be a DB failure, we log the error and let others be canceled
+                     log.error("Failed to update message " + rec.del.getReference() + " to state C", e);
+                     continue;
+                  }
+               }
+            }
+
             /*
              * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
              */
@@ -1222,7 +1294,54 @@
             }
 
             channels.add(rec.del.getObserver());
+            
+            if (isFromFailure)
+            {
+               failureCanceledDels.put(rec.deliveryID, rec.deliveryID);
+            }
          }
+         
+         if (isSuckerSession.get())
+         {
+            //here we handle rare cases where a sucker acked a message but then crashed.
+            //so the message won't be updated to target channel and also the session already 
+            //forgets it. We take this chance here 
+            //to load those messages into channels and redeliver
+            for (Channel ch : curChannels)
+            {
+               List<ReferenceInfo> refs = pm.claimMessagesInSuck(ch.getChannelID());
+               
+               if (refs.size() > 0)
+               {
+                  List<Long> mids = new ArrayList<Long>();
+                  Map<Long, ReferenceInfo> refInfoMap = new HashMap<Long, ReferenceInfo>();
+               
+                  for (ReferenceInfo refInfo : refs)
+                  {
+                     mids.add(refInfo.getMessageId());
+                     refInfoMap.put(refInfo.getMessageId(), refInfo);
+                  }
+               
+                  List messages = pm.getMessages(mids);
+                  
+                  iter = messages.iterator();
+                  
+                  while (iter.hasNext())
+                  {
+                     Message m = (Message)iter.next();
+                     MessageReference mref = ms.reference(m);
+                     
+                     ReferenceInfo mInfo = refInfoMap.get(m.getMessageID());
+                     mref.setDeliveryCount(mInfo.getDeliveryCount());
+                     mref.setScheduledDeliveryTime(mInfo.getScheduledDelivery());
+                     
+                     Delivery del = new SimpleDelivery(ch, mref, true, true);
+                     del.cancel();
+                  }
+                  channels.add(ch);
+               }
+            }
+         }
 
          promptDelivery(channels);
 
@@ -1600,19 +1719,40 @@
       synchronized (deliveries)
       {
          rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+         if (rec == null)
+         {
+            //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
+            //and has failed over since recoverDeliveries won't have been called
+            if (trace)
+            {
+               log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
+            }
+            return null;
+         }
+         //now we check for suckers
+         if (rec.del.isSucked())
+         {
+            //we need to reverse the message (if still there). If reverse failed, we don't do 
+            //cancel.
+            if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+            {
+               try
+               {
+                  //now ask pm to do it.
+                  pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+               }
+               catch (Exception e)
+               {
+                  if (trace)
+                  {
+                     log.trace("Failed to update message " + rec.del.getReference() + " to state C");
+                  }
+                  return null;
+               }
+            }
+         }
       }
 
-      if (rec == null)
-      {
-         //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
-      	//and has failed over since recoverDeliveries won't have been called
-      	if (trace)
-      	{
-      		log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
-      	}
-      	return null;
-      }
-
       //Note we check the flag *and* evaluate again, this is because the server and client clocks may
       //be out of synch and don't want to send back to the client a message it thought it has sent to
       //the expiry queue
@@ -1666,6 +1806,11 @@
 
       //Need to send a message to the replicant to remove the id
       postOffice.sendReplicateAckMessage(rec.queueName, del.getReference().getMessage().getMessageID());
+      
+      if (isCC)
+      {
+         checkClose();
+      }
 
       return rec.del;
    }
@@ -1762,15 +1907,29 @@
       synchronized (deliveries)
       {
          rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+         if (rec == null)
+         {
+            //This can happen in one of the two cases:
+            //
+            //1. If an ack comes in after failover, or
+            //2. The session is closed due to server side connection failure notification processing.
+            //When a connection failure is detected at the server end, it will close all related server side
+            //sessions. As part of closing, any un-acked message will be canceled. 
+            //if a normal client side ack comes in just after the session is thus being closed, and this ack
+            //has just been canceled, the client side ack will end up here.
+            //
+            //We treat the cases differently. For case 1, we can safely ignore it.
+            //For case 2, we must throw an exception to indicating that the ack failed and the message will be re-delivered.
+            if (failureCanceledDels.remove(ack.getDeliveryID()) != null)
+            {
+               //ack should fail
+               throw new JMSException("Message already canceled before this ack " + ack + " and the message will be redelivered.");
+            }
+            log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
+            return false;            
+         }
       }
 
-      if (rec == null)
-      {
-      	//This can happen if an ack comes in after failover
-         log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
-         return false;
-      }
-
       ServerConsumerEndpoint consumer = rec.getConsumer();
 
       if (consumer != null && consumer.isRemote())
@@ -1791,6 +1950,11 @@
       }
 
       if (trace) { log.trace(this + " acknowledged delivery " + ack); }
+      
+      if (isCC)
+      {
+         checkClose();
+      }
 
       return true;
    }
@@ -1829,12 +1993,12 @@
 
       JBossDestination dest = new JBossQueue(queueName);
 
-      //We don't care about redelivery delays and number of attempts for a direct consumer
-
+      //We don't care about redelivery delays for a direct consumer
+      //We do care about number of attempts, see JBMESSAGING-1774
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, binding.queue,
                                     binding.queue.getName(), this, selectorString, false,
-                                    dest, null, null, 0, -1, true, false, prefetchSize);
+                                    dest, null, null, 0, 1, true, false, prefetchSize);
 
       ConsumerAdvised advised;
 
@@ -1856,6 +2020,8 @@
       }
 
       log.trace(this + " created and registered " + ep);
+      
+      isSuckerSession.set(true);
 
       return stub;
    }
@@ -2392,14 +2558,66 @@
             		throw new TransactionException("Failed to handle send ack", e);
             	}
             }
+
+            if (isCC && del != null)
+            {
+               try
+               {
+                  checkClose();
+               }
+               catch (JMSException e)
+               {
+                  //we don't need to do anything here.
+                  log.warn("Exception closing a CC session " + this);
+               }
+            }
          }
       }
 
       public void afterRollback(boolean onePhase) throws TransactionException
       {
-         //One phase rollbacks never hit the server - they are dealt with locally only
-         //so this would only ever be executed for a two phase rollback.
+         if (log.isTraceEnabled()) { log.trace(this + " afterRollback, onePhase: " + onePhase); }
+         //One phase rollbacks usually don't hit the server - they are dealt with locally only
+         //but if one-phase commit fails, we need to rollback the delivery
+         if (onePhase)
+         {
+            // Remove the deliveries from the delivery map.
+            Iterator iter = delList.iterator();
+            while (iter.hasNext())
+            {
+               Long deliveryId = (Long)iter.next();
 
+               DeliveryRecord del = (DeliveryRecord)deliveries.remove(deliveryId);
+
+               if (del != null && del.replicating)
+               {
+                  //TODO - we could batch this in one message
+                  try
+                  {
+                     postOffice.sendReplicateAckMessage(del.queueName, del.del.getReference().getMessage().getMessageID());
+                  }
+                  catch (Exception e)
+                  {
+                     throw new TransactionException("Failed to handle send ack", e);
+                  }
+               }
+
+               if (isCC && del != null)
+               {
+                  try
+                  {
+                     checkClose();
+                  }
+                  catch (JMSException e)
+                  {
+                     //we don't need to do anything here.
+                     log.warn("Exception closing a CC session " + this);
+                  }
+               }
+            }
+         }
+         
+         //for a two phase rollback.
          //We don't do anything since cancellation is driven from the client.
       }
 
@@ -2409,4 +2627,19 @@
       }
    }
 
+   public PersistenceManager getPersistenceManager()
+   {
+      return pm;
+   }
+
+   public void setCC()
+   {
+      isCC = true;
+   }
+   
+   public boolean isCC()
+   {
+      return isCC;
+   }
+
 }

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -68,11 +68,16 @@
       return endpoint.closing(sequence);
    }
 
+   public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
+   {
+      return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA);
+   }
+
    public SessionDelegate createSessionDelegate(boolean transacted,
                                                 int acknowledgmentMode,
-                                                boolean isXA) throws JMSException
+                                                boolean isXA, boolean isCC) throws JMSException
    {
-      return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA);
+      return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA, isCC);
    }
 
    public String getClientID() throws JMSException

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -63,6 +63,8 @@
    private List sessionStatesList;
 
    private boolean clientSide;
+   
+   private boolean recovered;
 
    // Static --------------------------------------------------------
 
@@ -70,7 +72,13 @@
 
    public ClientTransaction()
    {
+      this(false);
+   }
+   
+   public ClientTransaction(boolean isRecovered)
+   {
       clientSide = true;
+      recovered = isRecovered;
    }
 
    // Public --------------------------------------------------------
@@ -215,6 +223,17 @@
       }
    }
 
+   public boolean isRecovered()
+   {
+      return recovered;
+   }
+
+   public void setRecovered(boolean b)
+   {
+      recovered = b;
+   }
+
+
    // Streamable implementation ---------------------------------
 
    public void write(DataOutputStream out) throws Exception
@@ -272,6 +291,7 @@
             out.writeLong(Long.MIN_VALUE);
          }
       }
+      out.writeBoolean(recovered);
    }
 
 
@@ -315,6 +335,8 @@
             sessionState.addAck(new DefaultAck(l));
          }
       }
+      
+      recovered = in.readBoolean();
    }
 
    // Protected -----------------------------------------------------

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/jms/wireformat/PacketSupport.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -91,6 +91,7 @@
    public static final int REQ_CONNECTION_STOP = 205;
    public static final int REQ_CONNECTION_SENDTRANSACTION = 206;
    public static final int REQ_CONNECTION_GETPREPAREDTRANSACTIONS = 207;
+   public static final int REQ_CONNECTION_CREATESESSIONDELEGATE2 = 208;
    
    // Session
    // -------
@@ -226,6 +227,9 @@
          case REQ_CONNECTION_CREATESESSIONDELEGATE:
             packet = new ConnectionCreateSessionDelegateRequest();
             break;
+         case REQ_CONNECTION_CREATESESSIONDELEGATE2:
+            packet = new ConnectionCreateSessionDelegateRequest2();
+            break;
          case REQ_CONNECTION_GETCLIENTID:
             packet = new ConnectionGetClientIDRequest();
             break;

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/Delivery.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -53,4 +53,8 @@
     * Mark if this delivery is with a prepared XA transaction.
     */
    boolean isXAPrepared();
+   
+   boolean isSucked();
+   
+   void setSucked(boolean isSucked);
 }

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -87,6 +87,24 @@
 
    void addTransaction(Transaction tx);
 
+   //drop all messages that belong to a channel.
+   void dropChannelMessages(long channelID) throws Exception;
+
+   //merge messages from one channel to another.
+   void mergeChannelMessage(long fromID, long toID) throws Exception;
+
+   //set if supporting storing transaction creation time.
+   void setSupportsTxAge(boolean supportsTxAge);
+
+   //if supports transaction creation time
+   boolean supportsTxAge();
+
+   //update the status of the message
+   void updateMessageState(long channelID, MessageReference ref, String state) throws Exception;
+
+   //update messages state to 'C' of the channel whose state is 'S', and return their messages ids
+   List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception;
+
    // Interface value classes ----------------------------------------------------------------------
 
    class MessageChannelPair

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -36,7 +36,7 @@
 import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Util;
 
-import javax.jms.Session;
+import javax.jms.JMSException;
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.Xid;
@@ -53,6 +53,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
  * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:hgao at jboss.org">Howard Gao</a>
  *
  * @version <tt>1.1</tt>
  *
@@ -102,8 +103,9 @@
    private int idCacheCounter = 0;
 
    private final int idCacheSize;
+   
+   private boolean supportsTxAge;
 
-
    // Constructors --------------------------------------------------
 
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -322,6 +324,10 @@
       ResultSet rs = null;
       PreparedTxInfo txInfo = null;
       TransactionWrapper wrap = new TransactionWrapper();
+      
+      PreparedStatement exst = null;
+      ResultSet exrs = null;
+      Map<Long, Long> txTimes = new HashMap<Long, Long>();
 
       try
       {
@@ -329,6 +335,27 @@
 
          conn = ds.getConnection();
 
+         if (supportsTxAge)
+         {
+            try
+            {
+               exst = conn.prepareStatement(getSQLStatement("SELECT_TRANSACTION_START_TIME_EXTRA"));
+               exrs = exst.executeQuery();
+            
+               while (exrs.next())
+               {
+                  long txId = exrs.getLong(1);
+                  long txTime = exrs.getLong(2);
+                  txTimes.put(txId, txTime);
+               }
+            }
+            finally
+            {
+               closeResultSet(exrs);
+               closeStatement(exst);
+            }
+         }
+
          st = conn
                .prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
 
@@ -350,7 +377,7 @@
             Xid xid = new MessagingXid(branchQual, formatId, globalTxId);
 
             // create a tx info object with the result set detailsdetails
-            txInfo = new PreparedTxInfo(txId, xid);
+            txInfo = new PreparedTxInfo(txId, xid, txTimes.get(txId));
 
             transactions.add(txInfo);
          }
@@ -1134,6 +1161,169 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.contract.PersistenceManager#dropChannelMessages(long)
+    */
+   public void dropChannelMessages(final long channelID) throws Exception
+   {
+      class ChannelDropRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            PreparedStatement ps2 = null;
+            ResultSet rs = null;
+            
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+               ps.setLong(1, channelID);
+               rs = ps.executeQuery();
+               int rows;
+               
+               ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+               while (rs.next())
+               {
+                  long mid = rs.getLong(1);
+                  ps2.setLong(1, mid);
+                  ps2.executeUpdate();
+               }
+               ps2.close();
+               
+               ps.close();
+               
+               ps = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+               ps.setLong(1, channelID);
+               rows = ps.executeUpdate();
+               
+               if (trace)
+               {
+                  log.trace("Update page ord updated " + rows + " rows");
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      
+      new ChannelDropRunner().executeWithRetry();
+   }
+
+   /* (non-Javadoc)
+    * load messages from the channel (fromID) in the DB and
+    * add the messages to the channel (toID)
+    * @see org.jboss.messaging.core.contract.PersistenceManager#mergeChannelMessage(long, long)
+    */
+   public void mergeChannelMessage(final long fromID, final long toID) throws Exception
+   {
+
+      if (fromID == toID) { throw new IllegalArgumentException(
+            "Cannot merge queues - they have the same channel id!!"); }
+
+      class ChannelMergeRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            ResultSet rs = null;
+            PreparedStatement ps2 = null;
+
+            try
+            {
+               //first get max page order of toID channel
+               ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+               ps.setLong(1, toID);
+
+               rs = ps.executeQuery();
+
+               rs.next();
+
+               Long maxOrdering = new Long(rs.getLong(2));
+
+               long pageCount = 0;
+
+               if (rs.wasNull())
+               {
+                  maxOrdering = null;
+               }
+               else
+               {
+                  //If maxOrdering is not null, update the page order
+                  pageCount = maxOrdering + 1;
+               }
+
+               rs.close();
+
+               ps.close();
+               
+               if (pageCount > 0)
+               {
+                  //update paging
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+                  
+                  ps2 = conn
+                     .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+                  ps.setLong(1, fromID);
+
+                  rs = ps.executeQuery();
+
+                  while (rs.next())
+                  {
+                     long msgId = rs.getLong(1);
+
+                     ps2.setLong(1, pageCount);
+
+                     ps2.setLong(2, msgId);
+
+                     ps2.setLong(3, fromID);
+                     
+                     int rows = ps2.executeUpdate();
+
+                     if (trace)
+                     {
+                        log.trace("Update page ord updated " + rows + " rows");
+                     }
+
+                     pageCount++;
+                  }
+                  ps2.close();
+                  ps.close();
+               }
+               
+               //update channel id
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+               ps.setLong(1, toID);
+
+               ps.setLong(2, fromID);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Update channel id updated " + rows + " rows");
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      
+      new ChannelMergeRunner().executeWithRetry();
+   }
+
    public InitialLoadInfo mergeAndLoad(final long fromChannelID,
          final long toChannelID, final int numberToLoad,
          final long firstPagingOrder, final long nextPagingOrder)
@@ -1448,6 +1638,12 @@
                {
                   log.trace("Updated " + rows + " rows");
                }
+               
+               if (rows == 0)
+               {
+                  //no message updated, should be canceled back already
+                  throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+               }
 
                return null;
             }
@@ -2188,6 +2384,36 @@
          }
          closeStatement(ps);
       }
+      
+      if (supportsTxAge)
+      {
+         try
+         {
+            statement = getSQLStatement("INSERT_TRANSACTION_EXTRA");
+
+            ps = conn.prepareStatement(statement);
+
+            ps.setLong(1, tx.getId());
+
+            ps.setLong(2, tx.getCreationTime());
+
+            rows = ps.executeUpdate();
+         }
+         finally
+         {
+            if (trace)
+            {
+               String s = JDBCUtil.statementToString(statement,
+                                                     new Integer(nodeID),
+                                                     new Long(tx.getId()),
+                                                     "<byte-array>",
+                                                     new Integer(formatID),
+                                                     "<byte-array>");
+               log.trace(s + (rows == -1 ? " failed!" : " inserted " + rows + " row(s)"));
+            }
+            closeStatement(ps);
+         }
+      }
    }
 
    protected void removeTXRecord(Connection conn, Transaction tx)
@@ -2222,6 +2448,30 @@
       {
          closeStatement(ps);
       }
+      
+      if (supportsTxAge)
+      {
+         try
+         {
+            ps = conn.prepareStatement(getSQLStatement("DELETE_TRANSACTION_EXTRA"));
+
+            ps.setLong(1, tx.getId());
+
+            int rows = ps.executeUpdate();
+
+            if (trace)
+            {
+               log.trace(JDBCUtil.statementToString(
+                     getSQLStatement("DELETE_TRANSACTION_EXTRA"), new Integer(nodeID),
+                     new Long(tx.getId()))
+                     + " removed " + rows + " row(s)");
+            }
+         }
+         finally
+         {
+            closeStatement(ps);
+         }
+      }
    }
 
    protected void addReference(long channelID, MessageReference ref,
@@ -2525,8 +2775,7 @@
 
             if (i == null) { return null; }
 
-            is = new BufferedInputStream(rs.getBinaryStream(columnIndex),
-                  BUFFER_SIZE);
+            is = new BufferedInputStream(i, BUFFER_SIZE);
 
             os = new ByteArrayOutputStream(BUFFER_SIZE);
 
@@ -2597,6 +2846,11 @@
               "CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))");
       // Id cache
       map.put("CREATE_ID_CACHE", "CREATE TABLE JBM_ID_CACHE (NODE_ID INTEGER, CNTR INTEGER, JBM_ID VARCHAR(255), PRIMARY KEY(NODE_ID, CNTR))");
+
+      // Transaction Extra
+      //https://jira.jboss.org/jira/browse/JBMESSAGING-1772
+      map.put("CREATE_TRANSACTION_EXTRA", "CREATE TABLE JBM_TX_EX (TRANSACTION_ID BIGINT, START_TIME BIGINT, PRIMARY KEY (TRANSACTION_ID))");
+
       return map;
    }
 
@@ -2668,6 +2922,10 @@
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("DELETE_MESSAGE",
               "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+      map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
+      map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?");
+
+      
       // Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "
@@ -2691,6 +2949,16 @@
       // Other
       map.put("SELECT_ALL_CHANNELS",
               "SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF");
+      
+      //Transaction Extra
+      map.put("SELECT_TRANSACTION_START_TIME_EXTRA", "SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX");
+      map.put("INSERT_TRANSACTION_EXTRA", "INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)");
+      map.put("DELETE_TRANSACTION_EXTRA", "DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?");
+      
+      //sucker use
+      map.put("UPDATE_MESSAGE_STATE", "UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?");
+      map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'");
+      map.put("LOAD_REFS_IN_SUCK", "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
 
       return map;
    }
@@ -3043,4 +3311,107 @@
       }
    }
 
+   public void setSupportsTxAge(boolean supportsTxAge)
+   {
+      this.supportsTxAge = supportsTxAge;
+   }
+   
+   public boolean supportsTxAge()
+   {
+      return supportsTxAge;
+   }
+
+   public void updateMessageState(final long channelID, final MessageReference ref, final String c) throws Exception
+   {
+      class UpdateMessageStateRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psReference = null;
+
+            try
+            {
+               psReference = conn
+                     .prepareStatement(getSQLStatement("UPDATE_MESSAGE_STATE"));
+
+               psReference.setString(1, c);
+
+               psReference.setLong(2, ref.getMessage().getMessageID());
+
+               psReference.setLong(3, channelID);
+
+               int rows = psReference.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Updated " + rows + " rows");
+               }
+               
+               if (rows != 1)
+               {
+                  throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psReference);
+            }
+         }
+      }
+
+      new UpdateMessageStateRunner().executeWithRetry();
+   }
+
+   public List<ReferenceInfo> claimMessagesInSuck(final long channelID) throws Exception
+   {
+      final List<ReferenceInfo> msgIDs = new ArrayList<ReferenceInfo>();
+      
+      class MessageClaimRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            PreparedStatement ps2 = null;
+            ResultSet rs = null;
+            
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("LOAD_REFS_IN_SUCK"));
+               ps.setLong(1, channelID);
+               rs = ps.executeQuery();
+               
+               while (rs.next())
+               {
+                  long msgId = rs.getLong(1);
+                  int deliveryCount = rs.getInt(2);
+                  long sched = rs.getLong(3);
+
+                  ps2 = conn.prepareStatement(getSQLStatement("CLAIM_MESSAGE_IN_SUCK"));
+                  ps2.setLong(1, channelID);
+                  ps2.setLong(2, msgId);
+                  int rows = ps2.executeUpdate();
+                  
+                  if (trace)
+                  {
+                     log.trace("Message in suck claimed " + rows + " rows for message " + msgId);
+                  }
+
+                  msgIDs.add(new ReferenceInfo(msgId, deliveryCount, sched));
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return msgIDs;
+         }
+      }
+      new MessageClaimRunner().executeWithRetry();
+      return msgIDs;
+   }
+
 }

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -116,6 +116,15 @@
       // NOOP
    }
 
+   public void setSupportsTxAge(boolean supportsTxAge)
+   {
+   }
+
+   public boolean supportsTxAge()
+   {
+      return false;
+   }
+
    public void addTransaction(Transaction tx)
    {
       //To change body of implemented methods use File | Settings | File Templates.
@@ -211,6 +220,23 @@
       return timeMark;
    }
 
+   public void dropChannelMessages(long channelID) throws Exception
+   {
+   }
+
+   public void mergeChannelMessage(long fromID, long toID) throws Exception
+   {
+   }
+
+   public void updateMessageState(long channelID, MessageReference ref, String state) throws Exception
+   {
+   }
+   
+   public List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception
+   {
+      return Collections.emptyList();
+   }
+
 }
 
 class IDCounter

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -51,6 +51,7 @@
    private MessageReference reference;   
    private boolean recovered;
    private Transaction tx;
+   private boolean isSucked;
 
    private boolean trace = log.isTraceEnabled();
 
@@ -136,6 +137,16 @@
       return false;
    }
 
+   public void setSucked(boolean sucked)
+   {
+      isSucked = sucked;
+   }
+   
+   public boolean isSucked()
+   {
+      return isSucked;
+   }
+
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/PreparedTxInfo.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -36,6 +36,8 @@
 	private long txId;
 
 	private Xid xid = null;
+	
+	private long creationTime = Long.MIN_VALUE;
 
 
    // Constructors ------------------------------------------------------------
@@ -45,6 +47,14 @@
 		setXid(xid);
 	}
 
+   public PreparedTxInfo(long txId, Xid xid, Long ct) {
+      setTxId(txId);
+      setXid(xid);
+      if (ct != null)
+      {
+         creationTime = ct;
+      }
+   }
 
    // Public ------------------------------------------------------------------
 
@@ -63,6 +73,11 @@
 	public void setXid(Xid xid) {
 		this.xid = xid;
 	}
+	
+	public long getCreationTime()
+	{
+	   return creationTime;
+	}
 
 
    // Object overrides --------------------------------------------------------

Modified: branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
===================================================================
--- branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java	2011-05-13 14:19:52 UTC (rev 8300)
+++ branches/JBossMessaging_1_4_6_GA_JBPAPP-6386/src/main/org/jboss/messaging/core/impl/tx/Transaction.java	2011-05-16 08:44:04 UTC (rev 8301)
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
 
 import org.jboss.logging.Logger;
@@ -63,6 +64,8 @@
    private Map callbackMap;
    
    private boolean recoveredFromStorage;
+   
+   private long creationTime;
          
    /**
     * If this is a XA transaction, when a commit is executed the transaction has to be removed from the transaction repository.
@@ -122,6 +125,7 @@
       state = STATE_ACTIVE;
       callbacks = new ArrayList();
       callbackMap = new HashMap();
+      creationTime = System.currentTimeMillis();
    }
    
    Transaction(long id, Xid xid, TransactionRepository tr)
@@ -130,6 +134,12 @@
       this.xid = xid;
       this.repository = tr;
    }
+
+   Transaction(long id, Xid xid, TransactionRepository tr, long ct)
+   {
+      this(id, xid, tr);
+      creationTime = ct;
+   }
    
    // Public --------------------------------------------------------
    
@@ -194,24 +204,51 @@
        
       boolean onePhase = state != STATE_PREPARED;
       
-      if (firstCallback != null)
+      Iterator iter = null;
+
+      try
       {
-         firstCallback.beforeCommit(onePhase);
+         
+         if (firstCallback != null)
+         {
+            firstCallback.beforeCommit(onePhase);
+         }
+
+         iter = callbacks.iterator();
+
+         while (iter.hasNext())
+         {
+            TxCallback callback = (TxCallback)iter.next();
+
+            callback.beforeCommit(onePhase);
+         }
+
+         state = STATE_COMMITTED;
+
+         if (trace)
+         {
+            log.trace(this + " committed");
+         }
+
       }
-      
-      Iterator iter = callbacks.iterator();
-      
-      while (iter.hasNext())
+      catch (Exception e)
       {
-         TxCallback callback = (TxCallback)iter.next();
-         
-         callback.beforeCommit(onePhase);
+         // for one-phase commit, we need to rollback the message.
+         if (onePhase)
+         {
+            if (trace)
+            {
+               log.trace(this + " one-phase commit results in rollback.");
+            }
+            
+            rollback();
+            
+            throw new XAException(XAException.XA_RBOTHER);
+         }
+         // for 2-pc commit, we throw the exception
+         throw e;
       }
       
-      state = STATE_COMMITTED;
-      
-      if (trace) { log.trace(this + " committed"); }
-      
       iter = callbacks.iterator();
       
       if (trace) { log.trace(this + " executing after commit hooks"); }
@@ -288,8 +325,13 @@
       if (trace) { log.trace(this + " prepare process complete"); }
    }
    
-   public synchronized void rollback() throws Exception
+   public void rollback() throws Exception
    {
+      rollback(false);
+   }
+   
+   public synchronized void rollback(boolean recovered) throws Exception
+   {
       if (state == STATE_COMMITTED)
       {
          throw new TransactionException("Transaction already committed, cannot rollback");
@@ -336,7 +378,15 @@
       for(Iterator i = callbacks.iterator(); i.hasNext();)
       {
          TxCallback callback = (TxCallback)i.next();
-         callback.afterRollback(onePhase);
+         
+         if (callback instanceof TxCallbackEx)
+         {
+            ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered || onePhase);            
+         }
+         else
+         {
+            callback.afterRollback(onePhase);            
+         }
       }            
       
       callbacks = null;
@@ -385,6 +435,20 @@
    {
       this.state = state;
    }
+
+   public long getAge()
+   {
+      if (creationTime == Long.MIN_VALUE)
+      {
+         return creationTime;
+      }
+      return System.currentTimeMillis() - creationTime;
+   }
+
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
       
    public String toString()
    {



More information about the jboss-cvs-commits mailing list