[jboss-cvs] JBoss Messaging SVN: r3106 - in trunk/src/main/org/jboss/messaging/core/impl: postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 14 21:43:50 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-09-14 21:43:50 -0400 (Fri, 14 Sep 2007)
New Revision: 3106

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Encapsulating MessagingPostOffice's statements on Retry blocks as well
(these were failing on MySQL Cluster under certain circustances)

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-14 23:22:25 UTC (rev 3105)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-15 01:43:50 UTC (rev 3106)
@@ -386,9 +386,9 @@
          throw new IllegalArgumentException("block size must be > 0");
       }
       
-      class ReserveIDBlockRunner extends JDBCTxRunner
+      class ReserveIDBlockRunner extends JDBCTxRunner<Long>
       {
-      	public Object doTransaction() throws Exception
+      	public Long doTransaction() throws Exception
    		{
             //	For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
             //construct the locks the row
@@ -456,7 +456,7 @@
    		}
       }
       
-      return (Long)new ReserveIDBlockRunner().executeWithRetry();
+      return new ReserveIDBlockRunner().executeWithRetry();
    }
          
    /*
@@ -2502,69 +2502,4 @@
       }
    }
    
-   private abstract class JDBCTxRunner
-   {   
-   	private static final int MAX_TRIES = 25;
-   	
-   	Connection conn;
-
-      TransactionWrapper wrap;
-         
-		public Object execute() throws Exception
-		{
-	      wrap = new TransactionWrapper();
-	      
-	      try
-	      {
-	         conn = ds.getConnection();
-	         
-	         return doTransaction();
-	      }
-	      catch (Exception e)
-	      {
-	         wrap.exceptionOccurred();
-	         throw e;
-	      }
-	      finally
-	      {	      		      
-	      	closeConnection(conn);
-	         wrap.end();
-	      }  
-		}
-		
-		public Object executeWithRetry() throws Exception
-		{
-	      int tries = 0;
-	      
-	      while (true)
-	      {
-	         try
-	         {
-	            Object res = execute();
-	            
-	            if (tries > 0)
-	            {
-	               log.warn("Update worked after retry");
-	            }
-	            return res;	            
-	         }
-	         catch (SQLException e)
-	         {       	
-	            log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
-	            
-	            tries++;
-	            if (tries == MAX_TRIES)
-	            {
-	               log.error("Retried " + tries + " times, now giving up");
-	               throw new IllegalStateException("Failed to excecute transaction");
-	            }
-	            log.warn("Trying again after a pause");
-	            //Now we wait for a random amount of time to minimise risk of deadlock
-	            Thread.sleep((long)(Math.random() * 500));	         	
-	         }  
-	      }
-		}
-		
-		public abstract Object doTransaction() throws Exception;
-   }  
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2007-09-14 23:22:25 UTC (rev 3105)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2007-09-15 01:43:50 UTC (rev 3106)
@@ -24,6 +24,7 @@
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -364,4 +365,71 @@
          return failed;
       }
    }
+
+   protected abstract class JDBCTxRunner<T>
+   {
+   	private static final int MAX_TRIES = 25;
+
+   	protected Connection conn;
+
+      private TransactionWrapper wrap;
+
+      public T execute() throws Exception
+		{
+	      wrap = new TransactionWrapper();
+
+	      try
+	      {
+	         conn = ds.getConnection();
+
+	         return doTransaction();
+	      }
+	      catch (Exception e)
+	      {
+	         wrap.exceptionOccurred();
+	         throw e;
+	      }
+	      finally
+	      {
+	      	closeConnection(conn);
+	         wrap.end();
+	      }
+		}
+
+		public T executeWithRetry() throws Exception
+		{
+	      int tries = 0;
+
+	      while (true)
+	      {
+	         try
+	         {
+	            T res = execute();
+
+	            if (tries > 0)
+	            {
+	               log.warn("Update worked after retry");
+	            }
+	            return res;
+	         }
+	         catch (SQLException e)
+	         {
+	            log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+
+	            tries++;
+	            if (tries == MAX_TRIES)
+	            {
+	               log.error("Retried " + tries + " times, now giving up");
+	               throw new IllegalStateException("Failed to excecute transaction");
+	            }
+	            log.warn("Trying again after a pause");
+	            //Now we wait for a random amount of time to minimise risk of deadlock
+	            Thread.sleep((long)(Math.random() * 500));
+	         }
+	      }
+		}
+
+		public abstract T doTransaction() throws Exception;
+   }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-09-14 23:22:25 UTC (rev 3105)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-09-15 01:43:50 UTC (rev 3106)
@@ -2373,80 +2373,75 @@
    
    private Map getBindingsFromStorage() throws Exception
    {
-      Connection conn = null;
-      PreparedStatement ps  = null;
-      ResultSet rs = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-
-      Map bindings = new HashMap();
-      
-      try
+      class LoadBindings extends JDBCTxRunner<Map>
       {
-         conn = ds.getConnection();
+         public Map doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
+            ResultSet rs = null;
 
-         ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
+            Map bindings = new HashMap();
 
-         ps.setString(1, officeName);
-         
-         ps.setInt(2, thisNodeID);
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
 
-         rs = ps.executeQuery();
+               ps.setString(1, officeName);
 
-         while (rs.next())
-         {
-            String queueName = rs.getString(1);
-            String conditionText = rs.getString(2);
-            String selector = rs.getString(3);
+               ps.setInt(2, thisNodeID);
 
-            if (rs.wasNull())
-            {
-               selector = null;
-            }
+               rs = ps.executeQuery();
 
-            long channelID = rs.getLong(4);
-                       
-            boolean bindingClustered = rs.getString(5).equals("Y");
-            
-            boolean allNodes = rs.getString(6).equals("Y");
-            
-            //If the node is not clustered then we load the bindings as non clustered
-                    	
-            Filter filter = null;
-            
-            if (selector != null)
+               while (rs.next())
+               {
+                  String queueName = rs.getString(1);
+                  String conditionText = rs.getString(2);
+                  String selector = rs.getString(3);
+
+                  if (rs.wasNull())
+                  {
+                     selector = null;
+                  }
+
+                  long channelID = rs.getLong(4);
+
+                  boolean bindingClustered = rs.getString(5).equals("Y");
+
+                  boolean allNodes = rs.getString(6).equals("Y");
+
+                  //If the node is not clustered then we load the bindings as non clustered
+
+                  Filter filter = null;
+
+                  if (selector != null)
+                  {
+                     filter = filterFactory.createFilter(selector);
+                  }
+
+                  Queue queue = new MessagingQueue(thisNodeID, queueName, channelID, ms, pm,
+                                                   true, filter, bindingClustered && clustered);
+
+                  if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
+
+                  Condition condition = conditionFactory.createCondition(conditionText);
+
+                  Binding binding = new Binding(condition, queue, allNodes);
+
+                  bindings.put(queueName, binding);
+               }
+
+               return bindings;
+            }
+            finally
             {
-            	filter = filterFactory.createFilter(selector);
+               closeResultSet(rs);
+
+               closeStatement(ps);
             }
-            
-            Queue queue = new MessagingQueue(thisNodeID, queueName, channelID, ms, pm,
-                                             true, filter, bindingClustered && clustered);
-            
-            if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
-            
-            Condition condition = conditionFactory.createCondition(conditionText);
-            
-            Binding binding = new Binding(condition, queue, allNodes);
-            
-            bindings.put(queueName, binding);                        
          }
-         
-         return bindings;
       }
-      catch (Exception e)
-      {
-      	wrap.exceptionOccurred();
-      	throw e;
-      }
-      finally
-      {
-         closeResultSet(rs);
-         
-         closeStatement(ps);
-         
-         closeConnection(conn);
 
-         wrap.end();
-      }	
+      return new LoadBindings().executeWithRetry();
    }
    
    private void loadBindings() throws Exception
@@ -2483,95 +2478,91 @@
    }
     
 
-   private void insertBindingInStorage(Condition condition, Queue queue, boolean allNodes) throws Exception
+   private void insertBindingInStorage(final Condition condition, final Queue queue, final boolean allNodes) throws Exception
    {
-      Connection conn = null;
-      PreparedStatement ps  = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-
-      try
+      class InsertBindings extends JDBCTxRunner
       {
-         conn = ds.getConnection();
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
 
-         ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
 
-         ps.setString(1, officeName);
-         ps.setInt(2, thisNodeID);
-         ps.setString(3, queue.getName());
-         ps.setString(4, condition.toText());
-         String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
-         if (filterString != null)
-         {
-            ps.setString(5, filterString);
-         }
-         else
-         {
-            ps.setNull(5, Types.VARCHAR);
-         }
-         ps.setLong(6, queue.getChannelID());        
-         if (queue.isClustered())
-         {
-            ps.setString(7, "Y");
-         }
-         else
-         {
-            ps.setString(7, "N");
-         }
-         if (allNodes)
-         {
-         	ps.setString(8, "Y");
-         }
-         else
-         {
-         	ps.setString(8, "N");
-         }
+               ps.setString(1, officeName);
+               ps.setInt(2, thisNodeID);
+               ps.setString(3, queue.getName());
+               ps.setString(4, condition.toText());
+               String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
+               if (filterString != null)
+               {
+                  ps.setString(5, filterString);
+               }
+               else
+               {
+                  ps.setNull(5, Types.VARCHAR);
+               }
+               ps.setLong(6, queue.getChannelID());
+               if (queue.isClustered())
+               {
+                  ps.setString(7, "Y");
+               }
+               else
+               {
+                  ps.setString(7, "N");
+               }
+               if (allNodes)
+               {
+                  ps.setString(8, "Y");
+               }
+               else
+               {
+                  ps.setString(8, "N");
+               }
 
-         ps.executeUpdate();
+               ps.executeUpdate();
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+
+            return null;
+         }
       }
-      catch (Exception e)
-      {
-      	wrap.exceptionOccurred();
-      	throw e;
-      }
-      finally
-      {
-      	closeStatement(ps);
-      	closeConnection(conn);
-         wrap.end();
-      }
+
+      new InsertBindings().executeWithRetry();
    }
 
-   private boolean deleteBindingFromStorage(Queue queue) throws Exception
+   private boolean deleteBindingFromStorage(final Queue queue) throws Exception
    {
-      Connection conn = null;
-      PreparedStatement ps  = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-
-      try
+      class DeleteBindings extends JDBCTxRunner<Boolean>
       {
-         conn = ds.getConnection();
+         public Boolean doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
 
-         ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
 
-         ps.setString(1, officeName);
-         ps.setInt(2, queue.getNodeID());
-         ps.setString(3, queue.getName());
+               ps.setString(1, officeName);
+               ps.setInt(2, queue.getNodeID());
+               ps.setString(3, queue.getName());
 
-         int rows = ps.executeUpdate();
+               int rows = ps.executeUpdate();
 
-         return rows == 1;
+               return rows == 1;
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+         }
       }
-      catch (Exception e)
-      {
-      	wrap.exceptionOccurred();
-      	throw e;
-      }
-      finally
-      {
-      	closeStatement(ps);
-      	closeConnection(conn);
-         wrap.end();
-      }
+
+      return new DeleteBindings().executeWithRetry();
    }
 
    private boolean leaveMessageReceived(Integer nodeId) throws Exception




More information about the jboss-cvs-commits mailing list