[jboss-cvs] JBoss Messaging SVN: r3038 - in trunk/src: main/org/jboss/messaging/core/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 23 09:56:17 EDT 2007
Author: timfox
Date: 2007-08-23 09:56:17 -0400 (Thu, 23 Aug 2007)
New Revision: 3038
Modified:
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
Now uses conditional insert
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-23 12:16:46 UTC (rev 3037)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-23 13:56:17 UTC (rev 3038)
@@ -34,6 +34,7 @@
<attribute name="UsingBatchUpdates">true</attribute>
<attribute name="SqlProperties"><![CDATA[
+ CREATE_DUAL=CREATE TABLE JBM_DUAL (DUMMY INTEGER, PRIMARY KEY (DUMMY)) ENGINE = INNODB
CREATE_MESSAGE_REFERENCE=CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, MESSAGE_ID BIGINT, TRANSACTION_ID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERY_COUNT INTEGER, SCHED_DELIVERY BIGINT, PRIMARY KEY(CHANNEL_ID, MESSAGE_ID)) ENGINE = INNODB
CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JBM_MSG_REF_TX ON JBM_MSG_REF (TRANSACTION_ID)
CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JBM_MSG_REF_ORD ON JBM_MSG_REF (ORD)
@@ -44,6 +45,7 @@
CREATE_IDX_MESSAGE_TIMESTAMP=CREATE INDEX JBM_MSG_REF_TIMESTAMP ON JBM_MSG (TIMESTAMP)
CREATE_TRANSACTION=CREATE TABLE JBM_TX (NODE_ID INTEGER, TRANSACTION_ID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTION_ID)) ENGINE = INNODB
CREATE_COUNTER=CREATE TABLE JBM_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME)) ENGINE = INNODB
+ INSERT_DUAL=INSERT INTO JBM_DUAL VALUES (1)
INSERT_MESSAGE_REF=INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
UPDATE_MESSAGE_REF=UPDATE JBM_MSG_REF SET TRANSACTION_ID=?, STATE='-' WHERE MESSAGE_ID=? AND CHANNEL_ID=? AND STATE='C'
@@ -61,7 +63,8 @@
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
- INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) SELECT (?, ?, ?, ?, ?, ?, ?, ?) FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
MESSAGE_EXISTS=SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ? FOR UPDATE
REAP_MESSAGES=DELETE FROM JBM_MSG WHERE TIMESTAMP < ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-23 12:16:46 UTC (rev 3037)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-08-23 13:56:17 UTC (rev 3038)
@@ -135,6 +135,8 @@
super.start();
Connection conn = null;
+
+ PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
@@ -154,6 +156,27 @@
" Using an isolation level more strict than READ_COMMITTED may lead to deadlock.\n";
log.warn(warn);
}
+
+ //Now we need to insert a row in the DUAL table if it doesn't contain one already
+ ps = conn.prepareStatement(this.getSQLStatement("INSERT_DUAL"));
+
+ try
+ {
+ int rows = ps.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows into dual"); }
+ }
+ catch (SQLException e)
+ {
+ if (e.getSQLState().equals("23000"))
+ {
+ //Ignore PK violation - since might already be inserted
+ }
+ else
+ {
+ throw e;
+ }
+ }
}
catch (Exception e)
{
@@ -162,13 +185,11 @@
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ closeStatement(ps);
+ closeConnection(conn);
wrap.end();
}
-
+
log.debug(this + " started");
}
@@ -531,14 +552,12 @@
PreparedStatement psInsertReference = null;
PreparedStatement psInsertMessage = null;
- List<Message> pagedMessages = new ArrayList<Message>();
-
try
{
Iterator iter = references.iterator();
psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
while (iter.hasNext())
{
@@ -563,63 +582,19 @@
//Maybe we need to persist the message itself
Message m = ref.getMessage();
- synchronized (m)
- {
- if (!m.isPersisted())
- {
- //The message might actually still already exist (despite pagedcount=0) due to it already being paged
- //so we insert and ignore key violations
-
- storeMessage(m, psInsertMessage);
-
- try
- {
- rows = psInsertMessage.executeUpdate();
- }
- catch (SQLException e)
- {
- if (e.getSQLState().equals("23000"))
- {
- //This is a primary key violation
- //It might legitimately occur if the ref has been paged to two channels so it is not
- //left in memory any more, then loaded by one channel
- //The paged count would then be one, not two
- if (trace) { log.trace("Primary key violation detected", e); }
-
- //When we retry we don't want to insert again, so set persisted to true
- m.setPersisted(true);
-
- //Throw the exception so the tx is retried - the next time it won't try and insert the message
- violationOk = true;
- }
-
- throw e;
- }
-
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
- m.setPersisted(true);
-
- pagedMessages.add(m);
- }
- }
+ //We always try and insert the message, even if it might already be paged -
+ //we use a conditional insert though, so it won't insert it if it already exists
+
+ storeMessage(m, psInsertMessage);
+ psInsertMessage.setLong(9, m.getMessageID());
+
+ rows = psInsertMessage.executeUpdate();
+
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
}
return null;
}
- catch (Exception e)
- {
- //The tx will be rolled back
- //so we need to set the messages to not persisted
- for (Iterator iter = pagedMessages.iterator(); iter.hasNext(); )
- {
- Message msg = (Message)iter.next();
-
- msg.setPersisted(false);
- }
-
- throw e;
- }
finally
{
closeStatement(psInsertReference);
@@ -642,8 +617,6 @@
{
PreparedStatement psDeleteReference = null;
- List<Message> depagedReferences = new ArrayList<Message>();
-
try
{
psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
@@ -658,29 +631,11 @@
int rows = psDeleteReference.executeUpdate();
- if (trace) { log.trace("Deleted " + rows + " rows"); }
-
- //There is a small possibility that the ref is depaged here, then paged again, before this flag is set
- //and the tx is committed so the message be attempted to be inserted twice but this should be ok
- //since we ignore key violations on message insert
-
- ref.getMessage().setPersisted(false);
-
- depagedReferences.add(ref.getMessage());
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
}
return null;
- }
- catch (Exception e)
- {
- for (Iterator iter = depagedReferences.iterator(); iter.hasNext(); )
- {
- Message msg = (Message)iter.next();
-
- msg.setPersisted(true);
- }
- throw e;
- }
+ }
finally
{
closeStatement(psDeleteReference);
@@ -2023,6 +1978,7 @@
protected Map getDefaultDDLStatements()
{
Map<String, String> map = new LinkedHashMap<String, String>();
+ map.put("CREATE_DUAL", "CREATE TABLE JBM_DUAL (DUMMY INTEGER)");
//Message reference
map.put("CREATE_MESSAGE_REFERENCE",
"CREATE TABLE JBM_MSG_REF (CHANNEL_ID BIGINT, " +
@@ -2054,6 +2010,7 @@
protected Map getDefaultDMLStatements()
{
Map<String, String> map = new LinkedHashMap<String, String>();
+ map.put("INSERT_DUAL", "INSERT INTO JBM_DUAL VALUES (1) WHERE NOT EXISTS (SELECT * FROM JBM_DUAL)");
//Message reference
map.put("INSERT_MESSAGE_REF",
"INSERT INTO JBM_MSG_REF (CHANNEL_ID, MESSAGE_ID, TRANSACTION_ID, STATE, ORD, PAGE_ORD, DELIVERY_COUNT, SCHED_DELIVERY) " +
@@ -2091,6 +2048,11 @@
"INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
"TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)" );
+ map.put("INSERT_MESSAGE_CONDITIONAL",
+ "INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, " +
+ "TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE) " +
+ "SELECT (?, ?, ?, ?, ?, ?, ?, ?) " +
+ "FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)");
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("MESSAGE_EXISTS", "SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?");
map.put("REAP_MESSAGES", "DELETE FROM JBM_MSG WHERE TIMESTAMP <= ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = JBM_MSG.MESSAGE_ID)");
@@ -2418,9 +2380,7 @@
Connection conn;
TransactionWrapper wrap;
-
- boolean violationOk;
-
+
public Object execute() throws Exception
{
wrap = new TransactionWrapper();
@@ -2460,26 +2420,18 @@
return res;
}
catch (SQLException e)
- {
- if (e.getSQLState().equals("23000") && violationOk)
- {
- //Primary key violation - this is ok - we retry immediately
- violationOk = false;
- }
- else
- {
- log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
-
- tries++;
- if (tries == MAX_TRIES)
- {
- log.error("Retried " + tries + " times, now giving up");
- throw new IllegalStateException("Failed to excecute transaction");
- }
- log.warn("Trying again after a pause");
- //Now we wait for a random amount of time to minimise risk of deadlock
- Thread.sleep((long)(Math.random() * 500));
- }
+ {
+ log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
+
+ tries++;
+ if (tries == MAX_TRIES)
+ {
+ log.error("Retried " + tries + " times, now giving up");
+ throw new IllegalStateException("Failed to excecute transaction");
+ }
+ log.warn("Trying again after a pause");
+ //Now we wait for a random amount of time to minimise risk of deadlock
+ Thread.sleep((long)(Math.random() * 500));
}
}
}
More information about the jboss-cvs-commits
mailing list