[jboss-cvs] JBoss Messaging SVN: r3953 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP/src: etc/xmdesc and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 26 17:25:05 EDT 2008


Author: ataylor
Date: 2008-03-26 17:25:05 -0400 (Wed, 26 Mar 2008)
New Revision: 3953

Added:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/server/default/deploy/ndb-persistence-service.xml
Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1261 - support for clustered database

Added: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/server/default/deploy/ndb-persistence-service.xml	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/server/default/deploy/ndb-persistence-service.xml	2008-03-26 21:25:05 UTC (rev 3953)
@@ -0,0 +1,310 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+     MySql persistence deployment descriptor.
+
+     Tested with MySQL 5.0.27
+
+     $Id$
+ -->
+
+<server>
+
+   <!-- Persistence Manager MBean configuration
+       ======================================== -->
+
+   <mbean code="org.jboss.messaging.core.jmx.JDBCPersistenceManagerService"
+      name="jboss.messaging:service=PersistenceManager"
+      xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
+
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+
+      <!-- The datasource to use for the persistence manager -->
+
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+
+      <!-- If true will attempt to create tables and indexes on every start-up -->
+
+      <attribute name="CreateTablesOnStartup">true</attribute>
+
+      <!-- If true then we will automatically detect and reject duplicate messages sent during failover -->
+
+      <attribute name="DetectDuplicates">true</attribute>
+
+      <!-- The size of the id cache to use when detecting duplicate messages -->
+
+      <attribute name="IDCacheSize">500</attribute>
+
+      <attribute name="SqlProperties"><![CDATA[
+   CREATE_DUAL=CREATE TABLE JBM_DUAL (DUMMY INTEGER, PRIMARY KEY (DUMMY)) ENGINE = NDBCLUSTER
+   CREATE_MESSAGE_REFERENCE=CREATE TABLE JBM_MSG_REF (MESSAGE_ID BIGINT, CHANNEL_ID BIGINT, TRANSACTION_ID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERY_COUNT INTEGER, SCHED_DELIVERY BIGINT, PRIMARY KEY(MESSAGE_ID, CHANNEL_ID)) ENGINE = NDBCLUSTER
+   CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JBM_MSG_REF_TX ON JBM_MSG_REF (TRANSACTION_ID, STATE)
+   CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)
+   CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JBM_MSG_REF_PAGE_ORD ON JBM_MSG_REF (PAGE_ORD)
+   CREATE_IDX_MESSAGE_REF_MESSAGE_ID=CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)
+   CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY=CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)
+   CREATE_MESSAGE=CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, TYPE TINYINT, HEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, PRIMARY KEY (MESSAGE_ID)) ENGINE = NDBCLUSTER
+   CREATE_TRANSACTION=CREATE TABLE JBM_TX (NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID)) ENGINE = NDBCLUSTER
+   CREATE_COUNTER=CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME)) ENGINE = NDBCLUSTER
+   CREATE_ID_CACHE=CREATE TABLE JBM_ID_CACHE (NODE_ID INTEGER, CNTR INTEGER, JBM_ID VARCHAR(255), PRIMARY KEY(NODE_ID, CNTR)) ENGINE = NDBCLUSTER
+   INSERT_DUAL=INSERT INTO JBM_DUAL VALUES (1)
+   CHECK_DUAL=SELECT 1 FROM JBM_DUAL
+   INSERT_MESSAGE_REF=INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+   DELETE_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
+   UPDATE_MESSAGE_REF=UPDATE JBM_MSG_REF SET TRANSACTION_ID=?, STATE='-' WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
+   UPDATE_PAGE_ORDER=UPDATE JBM_MSG_REF SET PAGE_ORD = ? WHERE MESSAGE_ID=? AND CHANNEL_ID=?
+   COMMIT_MESSAGE_REF1=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='+'
+   COMMIT_MESSAGE_REF2=DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='-'
+   ROLLBACK_MESSAGE_REF1=DELETE FROM JBM_MSG_REF WHERE TRANSACTION_ID=? AND STATE='+'
+   ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
+   LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
+   LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
+   UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
+   SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
+   UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
+   INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+   INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
+   UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
+   INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
+   MESSAGE_ID_COLUMN=MESSAGE_ID
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
+   INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
+   DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
+   SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
+   SELECT_MESSAGE_ID_FOR_REF=SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '+' ORDER BY ORD
+   SELECT_MESSAGE_ID_FOR_ACK=SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD
+   UPDATE_COUNTER=UPDATE JBM_COUNTER SET NEXT_ID = ? WHERE NAME=?
+   SELECT_COUNTER=SELECT NEXT_ID FROM JBM_COUNTER WHERE NAME=? FOR UPDATE
+   INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
+   SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
+   UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
+   INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
+   LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
+      ]]></attribute>
+
+      <!-- The maximum number of parameters to include in a prepared statement -->
+
+      <attribute name="MaxParams">500</attribute>
+
+      <!-- This defines the retry strategy for failed sql statments-->
+      <attribute name="UseNDBFailoverStrategy">true</attribute>
+   </mbean>
+
+   <!-- Messaging Post Office MBean configuration
+        ========================================= -->
+
+   <mbean code="org.jboss.messaging.core.jmx.MessagingPostOfficeService"
+      name="jboss.messaging:service=PostOffice"
+      xmbean-dd="xmdesc/MessagingPostOffice-xmbean.xml">
+
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+
+      <!-- The name of the post office -->
+
+      <attribute name="PostOfficeName">JMS post office</attribute>
+
+      <!-- The datasource used by the post office to access it's binding information -->
+
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+
+      <!-- If true will attempt to create tables and indexes on every start-up -->
+
+      <attribute name="CreateTablesOnStartup">true</attribute>
+
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME)) ENGINE = NDBCLUSTER
+INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+      ]]></attribute>
+
+      <!-- This post office is clustered. If you don't want a clustered post office then set to false -->
+
+      <attribute name="Clustered">true</attribute>
+
+      <!-- All the remaining properties only have to be specified if the post office is clustered.
+           You can safely comment them out if your post office is non clustered -->
+
+      <!-- The JGroups group name that the post office will use -->
+
+      <attribute name="GroupName">${jboss.messaging.groupname:MessagingPostOffice}</attribute>
+
+      <!-- Max time to wait for state to arrive when the post office joins the cluster -->
+
+      <attribute name="StateTimeout">300000</attribute>
+
+      <!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
+
+      <attribute name="CastTimeout">300000</attribute>
+
+      <!-- Set this to true if you want failover of connections to occur when a node is shut down -->
+
+      <attribute name="FailoverOnNodeLeave">false</attribute>
+
+      <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
+
+      <!-- By default we use the TCP stack for data -->
+      <attribute name="DataChannelConfig">
+         <config>
+            <TCP start_port="7900"
+                 loopback="true"
+                 recv_buf_size="20000000"
+                 send_buf_size="640000"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 enable_bundling="false"
+                 use_send_queues="false"
+                 sock_conn_timeout="300"
+                 skip_suspected_members="true"
+                 use_concurrent_stack="true"
+                 thread_pool.enabled="true"
+                 thread_pool.min_threads="1"
+                 thread_pool.max_threads="200"
+                 thread_pool.keep_alive_time="5000"
+                 thread_pool.queue_enabled="true"
+                 thread_pool.queue_max_size="500"
+                 thread_pool.rejection_policy="run"
+                 oob_thread_pool.enabled="true"
+                 oob_thread_pool.min_threads="1"
+                 oob_thread_pool.max_threads="100"
+                 oob_thread_pool.keep_alive_time="5000"
+                 oob_thread_pool.queue_enabled="false"
+                 oob_thread_pool.queue_max_size="100"
+                 oob_thread_pool.rejection_policy="run"/>
+            <MPING timeout="5000"
+		         mcast_addr="${jboss.messaging.datachanneludpaddress,jboss.partition.udpGroup:228.6.6.6}"
+		         mcast_port="${jboss.messaging.datachanneludpport:45567}"
+		         ip_ttl="${jboss.messaging.ipttl:8}"
+		         num_initial_members="5"
+		         num_ping_requests="3"/>
+            <MERGE2 max_interval="100000" min_interval="20000"/>
+            <FD_SOCK/>
+            <VERIFY_SUSPECT timeout="1500"/>
+            <BARRIER/>
+            <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                           retransmit_timeout="300,600,1200,2400,4800"
+                           discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           max_bytes="400000"/>
+            <VIEW_SYNC avg_send_interval="10000"/>
+
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                shun="false" view_bundling="true"/>
+        </config>
+      </attribute>
+
+      <!-- JGroups stack configuration to use for the control channel - used for control messages -->
+
+      <!-- We use udp stack for the control channel -->
+      <attribute name="ControlChannelConfig">
+         <config>
+            <UDP
+                 mcast_addr="${jboss.messaging.controlchanneludpaddress,jboss.partition.udpGroup:228.7.7.7}"
+                 mcast_port="${jboss.messaging.controlchanneludpport:45568}"
+                 tos="8"
+                 ucast_recv_buf_size="20000000"
+                 ucast_send_buf_size="640000"
+                 mcast_recv_buf_size="25000000"
+                 mcast_send_buf_size="640000"
+                 loopback="false"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 ip_ttl="${jboss.messaging.ipttl:8}"
+                 enable_bundling="false"
+                 enable_diagnostics="true"
+                 thread_naming_pattern="cl"
+
+                 use_concurrent_stack="true"
+
+                 thread_pool.enabled="true"
+                 thread_pool.min_threads="1"
+                 thread_pool.max_threads="200"
+                 thread_pool.keep_alive_time="5000"
+                 thread_pool.queue_enabled="true"
+                 thread_pool.queue_max_size="1000"
+                 thread_pool.rejection_policy="Run"
+
+                 oob_thread_pool.enabled="true"
+                 oob_thread_pool.min_threads="1"
+                 oob_thread_pool.max_threads="8"
+                 oob_thread_pool.keep_alive_time="5000"
+                 oob_thread_pool.queue_enabled="false"
+                 oob_thread_pool.queue_max_size="100"
+                 oob_thread_pool.rejection_policy="Run"/>
+            <PING timeout="2000"
+                  num_initial_members="3"/>
+            <MERGE2 max_interval="100000"
+                    min_interval="20000"/>
+            <FD_SOCK />
+            <FD timeout="10000" max_tries="5" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" />
+            <BARRIER />
+            <pbcast.NAKACK use_stats_for_retransmission="false"
+                   exponential_backoff="150"
+                   use_mcast_xmit="true" gc_lag="0"
+                   retransmit_timeout="50,300,600,1200"
+                   discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           max_bytes="400000"/>
+            <VIEW_SYNC avg_send_interval="10000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                shun="false"
+                view_bundling="true"/>
+            <FC max_credits="500000"
+                    min_threshold="0.20"/>
+            <FRAG2 frag_size="60000"  />
+            <pbcast.STATE_TRANSFER/>
+            <pbcast.FLUSH timeout="20000"/>
+        </config>
+     </attribute>
+   </mbean>
+
+   <!-- Messaging JMS User Manager MBean config
+        ======================================= -->
+
+   <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManagerService"
+      name="jboss.messaging:service=JMSUserManager"
+      xmbean-dd="xmdesc/JMSUserManager-xmbean.xml">
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+      <attribute name="CreateTablesOnStartup">true</attribute>
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_USER_TABLE=CREATE TABLE JBM_USER (USER_ID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USER_ID)) ENGINE = NDBCLUSTER
+CREATE_ROLE_TABLE=CREATE TABLE JBM_ROLE (ROLE_ID VARCHAR(32) NOT NULL, USER_ID VARCHAR(32) NOT NULL, PRIMARY KEY(USER_ID, ROLE_ID)) ENGINE = NDBCLUSTER
+SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JBM_USER WHERE USER_ID=?
+POPULATE.TABLES.1  = INSERT INTO JBM_USER (USER_ID, PASSWD) VALUES ('guest', 'guest')
+POPULATE.TABLES.2  = INSERT INTO JBM_USER (USER_ID, PASSWD) VALUES ('j2ee', 'j2ee')
+POPULATE.TABLES.3  = INSERT INTO JBM_USER (USER_ID, PASSWD, CLIENTID) VALUES ('john', 'needle', 'DurableSubscriberExample')
+POPULATE.TABLES.4  = INSERT INTO JBM_USER (USER_ID, PASSWD) VALUES ('nobody', 'nobody')
+POPULATE.TABLES.5  = INSERT INTO JBM_USER (USER_ID, PASSWD) VALUES ('dynsub', 'dynsub')
+POPULATE.TABLES.6  = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('guest','guest')
+POPULATE.TABLES.7  = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('j2ee','guest')
+POPULATE.TABLES.8  = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('john','guest')
+POPULATE.TABLES.9  = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('subscriber','john')
+POPULATE.TABLES.10 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('publisher','john')
+POPULATE.TABLES.11 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('publisher','dynsub')
+POPULATE.TABLES.12 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('durpublisher','john')
+POPULATE.TABLES.13 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('durpublisher','dynsub')
+POPULATE.TABLES.14 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('noacc','nobody')
+      ]]></attribute>
+   </mbean>
+
+</server>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2008-03-26 20:43:15 UTC (rev 3952)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2008-03-26 21:25:05 UTC (rev 3953)
@@ -65,6 +65,12 @@
       <type>int</type>
    </attribute>
 
+   <attribute access="read-write" getMethod="isUseNDBFailoverStrategy" setMethod="setUseNDBFailoverStrategy">
+      <description>This changes the retry strategy for the persistence manager. If failure occurs on commit, we ignore the second failure and assume that the insert happened correctly</description>
+      <name>UseNDBFailoverStrategy</name>
+      <type>boolean</type>
+   </attribute>
+
    <attribute access="read-write" getMethod="isSupportsBlobOnSelect" setMethod="setSupportsBlobOnSelect">
       <description>Some databases don't support binding blobs on select clauses</description>
       <name>SupportsBlobOnSelect</name>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2008-03-26 20:43:15 UTC (rev 3952)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2008-03-26 21:25:05 UTC (rev 3953)
@@ -21,36 +21,6 @@
  */
 package org.jboss.messaging.core.impl;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.InputStream;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.Xid;
-
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.tx.MessagingXid;
 import org.jboss.logging.Logger;
@@ -66,6 +36,15 @@
 import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Util;
 
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.Xid;
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
 /**
  * JDBC implementation of PersistenceManager
  *
@@ -126,9 +105,9 @@
          Properties sqlProperties, boolean createTablesOnStartup,
          boolean usingBatchUpdates, boolean usingBinaryStream,
          boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect,
-         boolean detectDuplicates, int idCacheSize)
+         boolean detectDuplicates, boolean useNDBFailoverStrategy, int idCacheSize)
    {
-      super(ds, tm, sqlProperties, createTablesOnStartup);
+      super(ds, tm, sqlProperties, createTablesOnStartup, useNDBFailoverStrategy);
 
       // usingBatchUpdates is currently ignored due to sketchy support from
       // databases
@@ -144,6 +123,7 @@
       this.detectDuplicates = detectDuplicates;
 
       this.idCacheSize = idCacheSize;
+
    }
 
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -152,7 +132,7 @@
          boolean usingTrailingByte, int maxParams, boolean supportsBlobSelect)
    {
       this (ds, tm, sqlProperties, createTablesOnStartup, usingBatchUpdates, usingBinaryStream, usingTrailingByte,
-            maxParams, supportsBlobSelect, false, 0);
+            maxParams, supportsBlobSelect, false, false, 0);
    }
 
    // MessagingComponent overrides ---------------------------------
@@ -1331,12 +1311,14 @@
 
       class AddReferenceRunner extends JDBCTxRunner2
       {
+         private Message message;
+         private boolean messagePersisted = false;
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
             PreparedStatement psInsertMessage = null;
 
-            Message m = ref.getMessage();
+            message = ref.getMessage();
 
             try
             {
@@ -1353,13 +1335,13 @@
                   log.trace("Inserted " + rows + " rows");
                }
 
-               if (!m.isPersisted())
+               if (!message.isPersisted())
                {
                   // First time so persist the message
                   psInsertMessage = conn
                         .prepareStatement(getSQLStatement("INSERT_MESSAGE"));
 
-                  storeMessage(m, psInsertMessage, true);
+                  storeMessage(message, psInsertMessage, true);
                   rows = psInsertMessage.executeUpdate();
 
                   if (trace)
@@ -1369,15 +1351,17 @@
 
                   log.trace("message Inserted/updated " + rows + " rows");
 
+
+
+                  if (message instanceof JBossMessage)
+                  {
+                     cacheID(conn, ((JBossMessage) message).getJMSMessageID());
+                  }
                   // Needs to be at the end - in case an exception is thrown in
                   // which case retry will be attempted and we want to insert it
                   // again
-                  m.setPersisted(true);
-
-                  if (m instanceof JBossMessage)
-                  {
-                     cacheID(conn, ((JBossMessage)m).getJMSMessageID());
-                  }
+                  message.setPersisted(true);
+                  messagePersisted = true;
                }
 
                return null;
@@ -1388,6 +1372,15 @@
                closeStatement(psInsertMessage);
             }
          }
+
+         public void rollback()
+         {
+            if(messagePersisted)
+            {
+               messagePersisted = false;
+               message.setPersisted(false);
+            }
+         }
       }
 
       if (tx != null)
@@ -1715,6 +1708,8 @@
    {
       class HandleBeforeCommit1PCRunner extends JDBCTxRunner2
       {
+         private List<Message> messagesStored;
+
          public Object doTransaction() throws Exception
          {
             // For one phase we simply add rows corresponding to the refs and
@@ -1728,7 +1723,7 @@
             PreparedStatement psInsertMessage = null;
             PreparedStatement psDeleteReference = null;
 
-            List<Message> messagesStored = new ArrayList<Message>();
+            messagesStored = new ArrayList<Message>();
 
             try
             {
@@ -1826,22 +1821,25 @@
 
                return null;
             }
-            catch (Exception e)
+            finally
             {
+               closeStatement(psReference);
+               closeStatement(psDeleteReference);
+               closeStatement(psInsertMessage);
+            }
+         }
+
+         public void rollback()
+         {
+            if(messagesStored != null)
+            {
                for (Iterator i = messagesStored.iterator(); i.hasNext();)
                {
                   Message msg = (Message) i.next();
 
                   msg.setPersisted(false);
                }
-               throw e;
             }
-            finally
-            {
-               closeStatement(psReference);
-               closeStatement(psDeleteReference);
-               closeStatement(psInsertMessage);
-            }
          }
       }
 
@@ -1918,6 +1916,8 @@
    {
       class HandleBeforePrepareRunner extends JDBCTxRunner2
       {
+         private List<Message> messagesStored;
+
          public Object doTransaction() throws Exception
          {
             // We insert a tx record and
@@ -1928,7 +1928,7 @@
             PreparedStatement psInsertMessage = null;
             PreparedStatement psUpdateReference = null;
 
-            List<Message> messagesStored = new ArrayList<Message>();
+            messagesStored = new ArrayList<Message>();
 
             try
             {
@@ -2024,22 +2024,25 @@
 
                return null;
             }
-            catch (Exception e)
+            finally
             {
+               closeStatement(psReference);
+               closeStatement(psInsertMessage);
+               closeStatement(psUpdateReference);
+            }
+         }
+
+         public void rollback()
+         {
+            if(messagesStored != null)
+            {
                for (Iterator i = messagesStored.iterator(); i.hasNext();)
                {
                   Message msg = (Message) i.next();
 
                   msg.setPersisted(false);
                }
-               throw e;
             }
-            finally
-            {
-               closeStatement(psReference);
-               closeStatement(psInsertMessage);
-               closeStatement(psUpdateReference);
-            }
          }
       }
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2008-03-26 20:43:15 UTC (rev 3952)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2008-03-26 21:25:05 UTC (rev 3953)
@@ -21,25 +21,19 @@
  */
 package org.jboss.messaging.core.impl;
 
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.MessagingComponent;
 
 import javax.sql.DataSource;
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
 
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.contract.MessagingComponent;
-import org.jboss.util.NestedSQLException;
-
 /**
  * Common functionality for messaging components that need to access a database.
  *
@@ -64,7 +58,9 @@
    private Map defaultDMLStatements;
    
    private Map defaultDDLStatements;
-       
+
+   private boolean useNDBFailoverStrategy = false;
+
    private boolean createTablesOnStartup = true;
       
    public JDBCSupport()
@@ -92,7 +88,14 @@
       
       this.createTablesOnStartup = createTablesOnStartup;
    }
-      
+
+    public JDBCSupport(DataSource ds, TransactionManager tm, Properties sqlProperties,
+                      boolean createTablesOnStartup, boolean useNDBFailoverStrategy)
+   {
+      this(ds, tm, sqlProperties, createTablesOnStartup);
+      this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+   }
+
    // MessagingComponent overrides ---------------------------------
    
    public void start() throws Exception
@@ -447,6 +450,8 @@
       
       private boolean getConnectionFailed;
 
+      private boolean getCommitFailed;
+
       public T execute() throws Exception
       {                    
          Transaction tx = tm.suspend();
@@ -514,12 +519,22 @@
             }
             catch (SQLException  e)
             {
+               rollback();
+               if(getCommitFailed && useNDBFailoverStrategy)
+               {
+                  log.warn("Ignoring SQL exception on retry after commit failed, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+                  return null;
+               }
                if (getConnectionFailed)
                {
                   //Do not retry - just throw the exception up
                   throw e;
                }
-               
+               //this is the xopen code that signifies acommit has failed with the state of the tx unknown.
+               if("08007".equals(e.getSQLState()) && useNDBFailoverStrategy)
+               {
+                  getCommitFailed = true;
+               }
                log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
 
                tries++;
@@ -535,6 +550,14 @@
          }
       }
 
+      /**
+       * allow work to be done if commit fails, implement this if you want to rollback your own work such as setting flags.
+       */
+      public void rollback()
+      {
+         //noop
+      }
+
       public abstract T doTransaction() throws Exception;
    }
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2008-03-26 20:43:15 UTC (rev 3952)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2008-03-26 21:25:05 UTC (rev 3953)
@@ -21,13 +21,13 @@
  */
 package org.jboss.messaging.core.jmx;
 
-import javax.transaction.TransactionManager;
-
 import org.jboss.messaging.core.contract.MessagingComponent;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.impl.JDBCPersistenceManager;
 import org.jboss.messaging.util.ExceptionUtil;
 
+import javax.transaction.TransactionManager;
+
 /**
  * A JDBCPersistenceManagerService
  *
@@ -59,6 +59,8 @@
 
    private int idCacheSize = 500;
 
+   private boolean useNDBFailoverStrategy = false;
+
    // Constructors --------------------------------------------------------
 
    public JDBCPersistenceManagerService()
@@ -91,7 +93,7 @@
             new JDBCPersistenceManager(ds, tm, sqlProperties,
                                        createTablesOnStartup, usingBatchUpdates,
                                        usingBinaryStream, usingTrailingByte, maxParams,
-                                       supportsBlobOnSelect, detectDuplicates, idCacheSize);
+                                       supportsBlobOnSelect, detectDuplicates, useNDBFailoverStrategy, idCacheSize);
 
          persistenceManager.start();
 
@@ -148,6 +150,16 @@
       this.maxParams = maxParams;
    }
 
+   public boolean isUseNDBFailoverStrategy()
+   {
+      return useNDBFailoverStrategy;
+   }
+
+   public void setUseNDBFailoverStrategy(boolean useNDBFailoverStrategy)
+   {
+      this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+   }
+
    public boolean isUsingBinaryStream()
    {
       return usingBinaryStream;




More information about the jboss-cvs-commits mailing list