[jboss-cvs] JBoss Messaging SVN: r3108 - in trunk/src: main/org/jboss/messaging/core/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Sep 17 13:22:10 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-09-17 13:22:09 -0400 (Mon, 17 Sep 2007)
New Revision: 3108

Added:
   trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
Modified:
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1071
(I'm also placing getList on a retry block, because of possible issues on MySQL Cluster)

Added: trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml	                        (rev 0)
+++ trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml	2007-09-17 17:22:09 UTC (rev 3108)
@@ -0,0 +1,269 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+     MySql persistence deployment descriptor.
+
+     Tested with MySQL 4.1.22
+
+     $Id$
+ -->
+
+<server>
+
+   <!-- Persistence Manager MBean configuration
+       ======================================== -->
+
+   <mbean code="org.jboss.messaging.core.jmx.JDBCPersistenceManagerService"
+      name="jboss.messaging:service=PersistenceManager"
+      xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
+
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+
+      <!-- The datasource to use for the persistence manager -->
+
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+
+      <!-- If true will attempt to create tables and indexes on every start-up -->
+
+      <attribute name="CreateTablesOnStartup">true</attribute>
+
+      <!-- If true then will use JDBC batch updates -->
+
+      <attribute name="UsingBatchUpdates">true</attribute>
+
+      <attribute name="SqlProperties"><![CDATA[
+   CREATE_DUAL=CREATE TABLE JBM_DUAL (DUMMY INTEGER, PRIMARY KEY (DUMMY)) ENGINE = NDBCLUSTER
+   CREATE_MESSAGE_REFERENCE=CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, MESSAGE_ID BIGINT, TRANSACTION_ID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERY_COUNT INTEGER, SCHED_DELIVERY BIGINT, PRIMARY KEY(CHANNEL_ID, MESSAGE_ID)) ENGINE = NDBCLUSTER
+   CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JBM_MSG_REF_TX ON JBM_MSG_REF (TRANSACTION_ID)
+   CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)
+   CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JBM_MSG_REF_PAGE_ORD ON JBM_MSG_REF (PAGE_ORD)
+   CREATE_IDX_MESSAGE_REF_MESSAGE_ID=CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)
+   CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY=CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)
+   CREATE_MESSAGE=CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, HEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, TYPE TINYINT, PRIMARY KEY (MESSAGE_ID)) ENGINE = NDBCLUSTER
+   CREATE_IDX_MESSAGE_TIMESTAMP=CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)
+   CREATE_TRANSACTION=CREATE TABLE JBM_TX (NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID)) ENGINE = NDBCLUSTER
+   CREATE_COUNTER=CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME)) ENGINE = NDBCLUSTER
+   INSERT_DUAL=INSERT INTO JBM_DUAL VALUES (1)
+   CHECK_DUAL=SELECT 1 FROM JBM_DUAL
+   INSERT_MESSAGE_REF=INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+   DELETE_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
+   UPDATE_MESSAGE_REF=UPDATE JBM_MSG_REF SET TRANSACTION_ID=?, STATE='-' WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
+   UPDATE_PAGE_ORDER=UPDATE JBM_MSG_REF SET PAGE_ORD = ? WHERE MESSAGE_ID=? AND CHANNEL_ID=?
+   COMMIT_MESSAGE_REF1=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='+'
+   COMMIT_MESSAGE_REF2=DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='-'
+   ROLLBACK_MESSAGE_REF1=DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='+'
+   ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
+   LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
+   LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
+   UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
+   SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
+   SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
+   UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
+   UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
+   INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+   INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
+   UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
+   INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
+   SUPPORTS_BLOB_ON_SELECT=Y
+   MESSAGE_ID_COLUMN=MESSAGE_ID
+   REAP_MESSAGES=DELETE FROM JBM_MSG WHERE TIMESTAMP < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
+   INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
+   DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
+   SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
+   SELECT_MESSAGE_ID_FOR_REF=SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '+' ORDER BY ORD
+   SELECT_MESSAGE_ID_FOR_ACK=SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD
+   UPDATE_COUNTER=UPDATE JBM_COUNTER SET NEXT_ID = ? WHERE NAME=?
+   SELECT_COUNTER=SELECT NEXT_ID FROM JBM_COUNTER WHERE NAME=? FOR UPDATE
+   INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
+   SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
+   UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+      ]]></attribute>
+
+      <!-- The maximum number of parameters to include in a prepared statement -->
+
+      <attribute name="MaxParams">500</attribute>
+
+      <attribute name="ReaperPeriod">5000</attribute>
+   </mbean>
+
+   <!-- Messaging Post Office MBean configuration
+        ========================================= -->
+
+   <mbean code="org.jboss.messaging.core.jmx.MessagingPostOfficeService"
+      name="jboss.messaging:service=PostOffice"
+      xmbean-dd="xmdesc/MessagingPostOffice-xmbean.xml">
+
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+
+      <!-- The name of the post office -->
+
+      <attribute name="PostOfficeName">JMS post office</attribute>
+
+      <!-- The datasource used by the post office to access it's binding information -->
+
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+
+      <!-- If true will attempt to create tables and indexes on every start-up -->
+
+      <attribute name="CreateTablesOnStartup">true</attribute>
+
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME)) ENGINE = NDBCLUSTER
+INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+      ]]></attribute>
+
+      <!-- This post office is clustered. If you don't want a clustered post office then set to false -->
+
+      <attribute name="Clustered">true</attribute>
+
+      <!-- All the remaining properties only have to be specified if the post office is clustered.
+           You can safely comment them out if your post office is non clustered -->
+
+      <!-- The JGroups group name that the post office will use -->
+
+      <attribute name="GroupName">MessagingPostOffice</attribute>
+
+      <!-- Max time to wait for state to arrive when the post office joins the cluster -->
+
+      <attribute name="StateTimeout">5000</attribute>
+
+      <!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
+
+      <attribute name="CastTimeout">50000</attribute>
+
+      <!-- Enable this when the JGroups multiplexer comes of age
+      <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
+      <attribute name="ControlChannelName">udp-sync</attribute>
+      <attribute name="DataChannelName">udp</attribute>
+      <attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
+      -->
+
+      <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
+
+      <attribute name="DataChannelConfig">
+         <config>
+            <UDP
+               mcast_addr="228.8.8.8"
+               mcast_port="45567"
+               tos="8"
+               ucast_recv_buf_size="20000000"
+               ucast_send_buf_size="640000"
+               mcast_recv_buf_size="25000000"
+               mcast_send_buf_size="640000"
+               loopback="false"
+               discard_incompatible_packets="true"
+               max_bundle_size="64000"
+               max_bundle_timeout="30"
+               use_incoming_packet_handler="true"
+               use_outgoing_packet_handler="false"
+               ip_ttl="2"
+               down_thread="false" up_thread="false"
+               enable_bundling="false"/>
+            <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+            <MERGE2 max_interval="100000" down_thread="false" min_interval="20000" up_thread="false"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK
+               max_xmit_size="60000"
+               use_mcast_xmit="false"
+               gc_lag="0"
+               retransmit_timeout="300,600,1200,2400,4800"
+               down_thread="false" up_thread="false"
+               discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"
+                     down_thread="false" up_thread="false"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           down_thread="false" up_thread="false"
+                           max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="false"
+                        view_bundling="true"
+                        view_ack_collection_timeout="5000"/>
+            <FC max_credits="2000000" down_thread="false" up_thread="false"
+                min_threshold="0.10"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+
+         </config>
+      </attribute>
+
+      <!-- JGroups stack configuration to use for the control channel - used for control messages -->
+
+      <attribute name="ControlChannelConfig">
+         <config>
+            <UDP
+                 mcast_addr="228.8.8.8"
+                 mcast_port="45568"
+                 tos="8"
+                 ucast_recv_buf_size="20000000"
+                 ucast_send_buf_size="640000"
+                 mcast_recv_buf_size="25000000"
+                 mcast_send_buf_size="640000"
+                 loopback="false"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 use_outgoing_packet_handler="false"
+                 ip_ttl="2"
+                 down_thread="false" up_thread="false"
+                 enable_bundling="false"/>
+            <PING timeout="2000"
+                  down_thread="false" up_thread="false" num_initial_members="3"/>
+            <MERGE2 max_interval="100000"
+                    down_thread="false" up_thread="false" min_interval="20000"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK max_xmit_size="60000"
+                           use_mcast_xmit="false" gc_lag="0"
+                           retransmit_timeout="300,600,1200,2400,4800"
+                           down_thread="false" up_thread="false"
+                           discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"
+                     down_thread="false" up_thread="false"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           down_thread="false" up_thread="false"
+                           max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000" use_flush="true" flush_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="false"
+                        view_bundling="true"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="true" flush_timeout="3000"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false" timeout="20000" auto_flush_conf="false"/>
+        </config>
+     </attribute>
+
+   </mbean>
+
+   <!-- Messaging JMS User Manager MBean config
+        ======================================= -->
+
+   <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManagerService"
+      name="jboss.messaging:service=JMSUserManager"
+      xmbean-dd="xmdesc/JMSUserManager-xmbean.xml">
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+      <attribute name="CreateTablesOnStartup">true</attribute>
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_USER_TABLE=CREATE TABLE JBM_USER (USER_ID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USER_ID)) ENGINE = NDBCLUSTER
+CREATE_ROLE_TABLE=CREATE TABLE JBM_ROLE (ROLE_ID VARCHAR(32) NOT NULL, USER_ID VARCHAR(32) NOT NULL, PRIMARY KEY(USER_ID, ROLE_ID)) ENGINE = NDBCLUSTER
+SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JBM_USER WHERE USER_ID=?
+POPULATE.TABLES.1=INSERT INTO JBM_USER (USER_ID,PASSWD,CLIENTID) VALUES ('dilbert','dogbert','dilbert-id')
+      ]]></attribute>
+   </mbean>
+
+</server>


Property changes on: trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-17 16:29:52 UTC (rev 3107)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-17 17:22:09 UTC (rev 3108)
@@ -468,121 +468,126 @@
     * e.g. Oracle ARRAY types in Oracle which can be submitted as a param to an Oracle prepared statement
     * Although this would all be DB specific.
     */
-   public List getMessages(List messageIds) throws Exception
+   public List getMessages(final List messageIds) throws Exception
    {
       if (trace) { log.trace("Getting batch of messages for " + messageIds); }
-      
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      
-      try
+
+      class GetMessageListTX extends JDBCTxRunner<List>
       {
-         conn = ds.getConnection();
-         
-         Iterator iter = messageIds.iterator();
-         
-         int size = messageIds.size();
-         
-         int count = 0;
-         
-         List<Message> msgs = new ArrayList<Message>();
-         
-         while (iter.hasNext())
+
+         public List<Message> doTransaction() throws Exception
          {
-            if (ps == null)
+            PreparedStatement ps = null;
+            ResultSet rs = null;
+
+            try
             {
-               //PreparedStatements are cached in the JCA layer so we will never actually have more than
-               //100 distinct ones            
-               int numParams;
-               if (count < (size / maxParams) * maxParams)
+               Iterator iter = messageIds.iterator();
+
+               int size = messageIds.size();
+
+               int count = 0;
+
+               List<Message> msgs = new ArrayList<Message>();
+
+               while (iter.hasNext())
                {
-                  numParams = maxParams;
-               }
-               else
-               {
-                  numParams = size % maxParams;
-               }
-               StringBuffer buff = new StringBuffer(getSQLStatement("LOAD_MESSAGES"));
-               buff.append(" WHERE ").append(getSQLStatement("MESSAGE_ID_COLUMN")).append(" IN (");
-               for (int i = 0; i < numParams; i++)
-               {
-                  buff.append("?");
-                  if (i < numParams - 1)
+                  if (ps == null)
                   {
-                     buff.append(",");
+                     //PreparedStatements are cached in the JCA layer so we will never actually have more than
+                     //100 distinct ones
+                     int numParams;
+                     if (count < (size / maxParams) * maxParams)
+                     {
+                        numParams = maxParams;
+                     }
+                     else
+                     {
+                        numParams = size % maxParams;
+                     }
+                     StringBuffer buff = new StringBuffer(getSQLStatement("LOAD_MESSAGES"));
+                     buff.append(" WHERE ").append(getSQLStatement("MESSAGE_ID_COLUMN")).append(" IN (");
+                     for (int i = 0; i < numParams; i++)
+                     {
+                        buff.append("?");
+                        if (i < numParams - 1)
+                        {
+                           buff.append(",");
+                        }
+                     }
+                     buff.append(")");
+                     ps = conn.prepareStatement(buff.toString());
+
+                     if (trace)
+                     {
+                        log.trace(buff.toString());
+                     }
                   }
+
+                  long msgId = ((Long)iter.next()).longValue();
+
+                  ps.setLong((count % maxParams) + 1, msgId);
+
+                  count++;
+
+                  if (!iter.hasNext() || count % maxParams == 0)
+                  {
+                     rs = ps.executeQuery();
+
+                     while (rs.next())
+                     {
+                        long messageId = rs.getLong(1);
+
+                        boolean reliable = rs.getString(2).equals("Y");
+
+                        long expiration = rs.getLong(3);
+
+                        long timestamp = rs.getLong(4);
+
+                        byte priority = rs.getByte(5);
+
+                        byte[] bytes = getBytes(rs, 6);
+
+                        HashMap headers = bytesToMap(bytes);
+
+                        byte[] payload = getBytes(rs, 7);
+
+                        byte type = rs.getByte(8);
+
+                        Message m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
+                                                                 headers, payload, type);
+                        msgs.add(m);
+                     }
+
+                     rs.close();
+                     rs = null;
+
+                     ps.close();
+                     ps = null;
+                  }
                }
-               buff.append(")");
-               ps = conn.prepareStatement(buff.toString());
-               
-               if (trace)
-               {
-                  log.trace(buff.toString());
-               }
+
+               if (trace) { log.trace("Loaded " + msgs.size() + " messages in total"); }
+
+               return msgs;
             }
-            
-            long msgId = ((Long)iter.next()).longValue();
-            
-            ps.setLong((count % maxParams) + 1, msgId);
-            
-            count++;
-            
-            if (!iter.hasNext() || count % maxParams == 0)
+            catch (Exception e)
             {
-               rs = ps.executeQuery();
-               
-               while (rs.next())
-               {       
-                  long messageId = rs.getLong(1);
-                  
-                  boolean reliable = rs.getString(2).equals("Y");
-                  
-                  long expiration = rs.getLong(3);
-                  
-                  long timestamp = rs.getLong(4);
-                  
-                  byte priority = rs.getByte(5);        
-                  
-                  byte[] bytes = getBytes(rs, 6);
-                  
-                  HashMap headers = bytesToMap(bytes);
-                  
-                  byte[] payload = getBytes(rs, 7);
-                  
-                  byte type = rs.getByte(8);
-                  
-                  Message m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
-                                                           headers, payload, type);
-                  msgs.add(m);
-               }
-               
-               rs.close();
-               rs = null;
-               
-               ps.close();
-               ps = null;
+               throw e;
             }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+            }
          }
-         
-         if (trace) { log.trace("Loaded " + msgs.size() + " messages in total"); }
+      }
 
-         return msgs;
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-      	closeResultSet(rs);
-      	closeStatement(ps);
-      	closeConnection(conn);
-         wrap.end();
-      }
-   }  
+
+      return new GetMessageListTX().executeWithRetry();
+
+      
+   }
    
        
    // Related to paging functionality




More information about the jboss-cvs-commits mailing list