[jboss-cvs] JBoss Messaging SVN: r2892 - in trunk/src/main/org/jboss: jms/tx and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 13 12:55:14 EDT 2007


Author: timfox
Date: 2007-07-13 12:55:14 -0400 (Fri, 13 Jul 2007)
New Revision: 2892

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java
Log:
Fixed issue with 2pc rollback


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-13 16:55:14 UTC (rev 2892)
@@ -44,6 +44,7 @@
 import org.jboss.jms.delegate.BrowserDelegate;
 import org.jboss.jms.delegate.Cancel;
 import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.DeliveryRecovery;
 import org.jboss.jms.delegate.SessionEndpoint;
 import org.jboss.jms.destination.JBossDestination;
@@ -1528,7 +1529,20 @@
       for(Iterator i = acks.iterator(); i.hasNext(); )
       {
          Ack ack = (Ack)i.next();
+         
          Long id = new Long(ack.getDeliveryID());
+         
+         //TODO - do this more elegantly
+         if (ack instanceof DeliveryInfo)
+         {
+         	if (!((DeliveryInfo)ack).isShouldAck())
+         	{
+         		//If we are in VM then acks for non durable subs will still exist - this
+         		//won't happen remoptely since they are not written to the wire
+         		continue;
+         	}
+         }
+         
          DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
          
          if (rec == null)

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-07-13 16:55:14 UTC (rev 2892)
@@ -250,45 +250,6 @@
 
    // Streamable implementation ---------------------------------
    
-   //For message consumed using a non durable subscriber - we don't need to ack to the server
-   //so we remove them here
-   //TODO this could be optimised to prevent this extra removal stage before sending
-   public void removeUnnecessaryAcks()
-   {
-   	if (removeAcks)
-   	{
-	   	Iterator iter = sessionStatesMap.values().iterator();
-	
-	      while (iter.hasNext())
-	      {
-	         SessionTxState state = (SessionTxState)iter.next();
-	
-	         List acks = state.getAcks();
-	
-	         Iterator iter2 = acks.iterator();
-	         
-	         List newAcks = new ArrayList();
-	
-	         while (iter2.hasNext())
-	         {
-	            DeliveryInfo ack = (DeliveryInfo)iter2.next();
-	
-	            if (ack.isShouldAck())
-	            {
-	            	if (newAcks == null)
-	            	{
-	            		newAcks = new ArrayList();
-	            	}
-	            	newAcks.add(ack);
-	            }
-	         }
-	         
-	         state.setAcks(newAcks);
-	      }
-   	}
-   }
-   
-
    public void write(DataOutputStream out) throws Exception
    {
       out.writeByte(state);
@@ -326,17 +287,22 @@
 
             List acks = state.getAcks();
 
-            out.writeInt(acks.size());
-
             iter2 = acks.iterator();
 
             while (iter2.hasNext())
             {
                DeliveryInfo ack = (DeliveryInfo)iter2.next();
-
-               //We only need the delivery id written
-               out.writeLong(ack.getMessageProxy().getDeliveryId());
+               
+               //We don't want to send acks for things like non durable subs which will have been already acked
+               if (ack.isShouldAck())
+               {
+               	//We only need the delivery id written
+               	out.writeLong(ack.getMessageProxy().getDeliveryId());
+               }
             }
+            
+            //Marker for end of acks
+            out.writeLong(Long.MIN_VALUE);
          }
       }
    }
@@ -375,13 +341,11 @@
             sessionState.addMessage(msg);
          }
 
-         int numAcks = in.readInt();
-
-         for (int j = 0; j < numAcks; j++)
+         long l;
+         
+         while ((l = in.readLong()) != Long.MIN_VALUE)
          {
-            long ack = in.readLong();
-
-            sessionState.addAck(new DefaultAck(ack));
+         	sessionState.addAck(new DefaultAck(l));
          }
       }
    }

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-07-13 16:55:14 UTC (rev 2892)
@@ -205,8 +205,6 @@
       
       try
       {
-      	request.state.removeUnnecessaryAcks();
-      	
          connection.sendTransaction(request, false);
          
          // If we get this far we can remove the transaction
@@ -617,6 +615,10 @@
             
             del.redeliver(acks);
          }
+         else
+         {
+         	log.info("There are no messages to redeliver");
+         }
       }
    }
    
@@ -630,11 +632,6 @@
    {
       try
       {
-      	if (request.state != null)
-      	{
-      		request.state.removeUnnecessaryAcks();
-      	}
-      	
          connection.sendTransaction(request, false);
       }
       catch (Throwable t)

Modified: trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java	2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java	2007-07-13 16:55:14 UTC (rev 2892)
@@ -67,17 +67,13 @@
 
    protected void startService() throws Exception
    {
+   	InitialContext ic = null;
       try
       {
+         ic = new InitialContext();
+         ds = (DataSource)ic.lookup(dataSourceJNDIName);
          if (ds == null)
          {
-            InitialContext ic = new InitialContext();
-            ds = (DataSource)ic.lookup(dataSourceJNDIName);
-            ic.close();
-         }
-
-         if (ds == null)
-         {
             throw new IllegalStateException("No DataSource found. This service dependencies must " +
                                             "have not been enforced correctly!");
          }
@@ -87,6 +83,13 @@
       {
          throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
       }
+      finally
+      {
+      	if (ic != null)
+      	{
+      		ic.close();
+      	}
+      }
    }
 
    protected void stopService() throws Exception




More information about the jboss-cvs-commits mailing list