[jboss-svn-commits] JBL Code SVN: r15712 - in labs/jbossesb/workspace/dbevenius/redeliver/product: services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Oct 10 03:39:45 EDT 2007
Author: beve
Date: 2007-10-10 03:39:44 -0400 (Wed, 10 Oct 2007)
New Revision: 15712
Added:
labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/
labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java
Modified:
labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java
labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java
labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java
labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java
Log:
Changes for SJ custom redelivery.
Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java 2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java 2007-10-10 07:39:44 UTC (rev 15712)
@@ -24,4 +24,5 @@
* @throws MessageStoreException
*/
public boolean redeliver(URI uid) throws MessageStoreException;
+ public boolean redeliver(URI uid,String classification) throws MessageStoreException;
}
Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java 2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java 2007-10-10 07:39:44 UTC (rev 15712)
@@ -299,24 +299,34 @@
}
}
}
+
+ /**
+ *
+ */
+ public boolean redeliver(URI uuid) throws MessageStoreException
+ {
+ return redeliver(uuid, RedeliverStore.CLASSIFICATION_RDLVR);
+ }
+
/**
*
*/
- public boolean redeliver(URI uuid) throws MessageStoreException
+ public boolean redeliver(URI uuid,String classification) throws MessageStoreException
{
boolean isDelivered=false;
boolean error=false;
-
Connection con = null;
try
{
con = mgr.getConnection();
con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
con.setAutoCommit(false);
+// con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+
+ Message message=select(uuid, classification, con);
- Message message=select(uuid, con);
-
- if (message!=null && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con)==1) {
+ int result = delete(uuid, classification, con);
+ if (message!=null && result == 1) {
//now any good db should have set a read lock on this record, until we commit.
//if exception is thrown up the delivery count on the message
//if exceeds the maxcount then update the classification to DLQ.
@@ -332,19 +342,19 @@
if (isDelivered) {
//the message is delivered, we're good so remove it from the store
- delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
+ int secondDelete = delete(uuid, classification , con);
} else {
//the message was not delivered
if (message.getProperties().getProperty(DELIVER_COUNT)==null) {
//appearantly it was the first time
message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, Integer.valueOf("1"));
- insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+ insert(uuid, message, classification, "FALSE", con);
} else {
Integer redeliverCount = (Integer) message.getProperties().getProperty(DELIVER_COUNT);
if (redeliverCount < maxRedeliverCount || maxRedeliverCount < 0) {
//up the count
message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, ++redeliverCount);
- insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+ insert(uuid, message, CLASSIFICATION_RDLVR, "FALSE", con);
} else {
//undeliverable, send to the DLQ
insert(uuid, message, MessageStore.CLASSIFICATION_DLQ, "FALSE", con);
@@ -376,6 +386,7 @@
try {
con.close();
} catch (Exception e2) {
+ e2.printStackTrace();
logger.error(e2);
}
}
@@ -389,18 +400,27 @@
{
Message message=null;
String selectSql = "select * from "+tableName+" where uuid=?";
- PreparedStatement selectStmt = connection.prepareStatement(selectSql);
- selectStmt.setObject(1, uid.toString());
- ResultSet rs = selectStmt.executeQuery();
- if (rs.next()) {
- try {
- message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
- } catch (Exception e) {
- throw new MessageStoreException(e);
- }
+ PreparedStatement selectStmt = null;
+ ResultSet rs = null;
+ try
+ {
+
+ selectStmt = connection.prepareStatement(selectSql);
+ selectStmt.setObject(1, uid.toString());
+ rs = selectStmt.executeQuery();
+ if (rs.next()) {
+ try {
+ message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ }
+ }
}
- rs.close();
- selectStmt.close();
+ finally
+ {
+ if ( rs != null ) rs.close();
+ if ( selectStmt != null ) selectStmt.close();
+ }
return message;
}
@@ -409,19 +429,27 @@
{
Message message=null;
String selectSql = "select * from "+tableName+" where uuid=? and classification=?";
- PreparedStatement selectStmt = connection.prepareStatement(selectSql);
- selectStmt.setObject(1, uid.toString());
- selectStmt.setObject(2, classification);
- ResultSet rs = selectStmt.executeQuery();
- if (rs.next()) {
- try {
- message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
- } catch (Exception e) {
- throw new MessageStoreException(e);
- }
+ PreparedStatement selectStmt = null;
+ ResultSet rs = null;
+ try
+ {
+ selectStmt = connection.prepareStatement(selectSql);
+ selectStmt.setObject(1, uid.toString());
+ selectStmt.setObject(2, classification);
+ rs = selectStmt.executeQuery();
+ if (rs.next()) {
+ try {
+ message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ }
+ }
}
- rs.close();
- selectStmt.close();
+ finally
+ {
+ if ( rs != null ) rs.close();
+ if ( selectStmt != null) selectStmt.close();
+ }
return message;
}
@@ -429,11 +457,19 @@
throws SQLException
{
String deleteSql = "delete from "+tableName+" where uuid=? and classification=?";
- PreparedStatement stmt = connection.prepareStatement(deleteSql);
- stmt.setObject(1, uid.toString());
- stmt.setObject(2, classification);
- int result = stmt.executeUpdate();
- stmt.close();
+ PreparedStatement stmt = null;
+ int result = 0;
+ try
+ {
+ stmt = connection.prepareStatement(deleteSql);
+ stmt.setObject(1, uid.toString());
+ stmt.setObject(2, classification);
+ result = stmt.executeUpdate();
+ }
+ finally
+ {
+ if ( stmt != null ) stmt.close();
+ }
return result;
}
@@ -441,20 +477,27 @@
throws SQLException, MessageStoreException
{
String sql = "insert into "+tableName+"(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
- PreparedStatement ps = conn.prepareStatement(sql);
+ PreparedStatement ps = null;
+ try
+ {
+ ps = conn.prepareStatement(sql);
- ps.setString(1, uid.toString());
- ps.setString(2, message.getType().toString());
- try {
- String messageString = Base64.encodeObject(Util.serialize(message));
- ps.setString(3, messageString);
- } catch (Exception e) {
- throw new MessageStoreException(e);
+ ps.setString(1, uid.toString());
+ ps.setString(2, message.getType().toString());
+ try {
+ String messageString = Base64.encodeObject(Util.serialize(message));
+ ps.setString(3, messageString);
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ }
+ ps.setString(4, "TRUE");
+ ps.setString(5, classification);
+ ps.execute();
}
- ps.setString(4, "TRUE");
- ps.setString(5, classification);
- ps.execute();
- ps.close();
+ finally
+ {
+ if ( ps != null ) ps.close();
+ }
}
Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java 2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java 2007-10-10 07:39:44 UTC (rev 15712)
@@ -42,16 +42,19 @@
{
public final static String MESSAGE_STORE_CLASS = "message-store-class";
public final static String CLASSIFICATION_ATTR = "classification";
+ public final static String IGNORE_MSG_CLASSIFICATION_ATTR = "ignore-message-classification";
protected ConfigTree config;
protected MessageStore messageStore;
protected String classification;
private Logger log = Logger.getLogger(this.getClass());
+ private boolean ignoreMsgClassification;
public MessagePersister(ConfigTree config) throws ConfigurationException
{
this.config = config;
+ log.debug(config);
}
/**
* Persists the message to the MessageStore
@@ -61,7 +64,7 @@
String classification = this.classification;
try {
//the message can override the classification
- if (message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
+ if (ignoreMsgClassification != true && message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
classification = String.valueOf(message.getProperties().getProperty(MessageStore.CLASSIFICATION));
message.getProperties().remove(MessageStore.CLASSIFICATION);
}
@@ -82,9 +85,10 @@
if (classificationValue!=null) {
classification = classificationValue;
}
+ ignoreMsgClassification = Boolean.parseBoolean(config.getAttribute( IGNORE_MSG_CLASSIFICATION_ATTR, "false") );
if (log.isDebugEnabled()) {
log.debug("MessagePersister started with classification=" + classification
- + " and message-store-class=" + messageStore);
+ + " and message-store-class=" + messageStore + ", " + IGNORE_MSG_CLASSIFICATION_ATTR + "=" + ignoreMsgClassification);
}
messageStore = MessageStoreFactory.getInstance().getMessageStore(messageStoreClass);
}
@@ -133,10 +137,11 @@
if (message.getProperties().getProperty(MessageStore.MESSAGE_URI)!=null) {
URI uid = (URI) message.getProperties().getProperty(MessageStore.MESSAGE_URI);
try {
- if (message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
+ if (ignoreMsgClassification != true && message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
classification = String.valueOf(message.getProperties().getProperty(MessageStore.CLASSIFICATION));
}
//the message can override the classification
+ log.debug("Will remove uid: " + uid + ", classification: " + classification);
messageStore.removeMessage(uid, classification);
} catch (MessageStoreException mse) {
log.error("Could obtain messages.", mse);
Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java 2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java 2007-10-10 07:39:44 UTC (rev 15712)
@@ -63,15 +63,18 @@
public Message process(Message message) throws ActionProcessingException
{
- classification = MessageStore.CLASSIFICATION_RDLVR;
+ String classification = MessageStore.CLASSIFICATION_RDLVR;
try {
//the message can override the classification
if (message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
classification = String.valueOf(message.getProperties().getProperty(MessageStore.CLASSIFICATION));
}
Map<URI, Message> messageMap = messageStore.getAllMessages(classification);
+ int counter = 0;
for (URI uid : messageMap.keySet()) {
- redeliverStore.redeliver(uid);
+ counter++;
+ log.debug( counter + "Going to redeliver uid : "+ uid );
+ redeliverStore.redeliver(uid,classification);
}
} catch (MessageStoreException mse) {
log.error("Could not obtain messages.", mse);
Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java 2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java 2007-10-10 07:39:44 UTC (rev 15712)
@@ -16,6 +16,7 @@
*/
public class RedeliverEventMessageComposer implements ScheduledEventMessageComposer
{
+ private String classification = MessageStore.CLASSIFICATION_RDLVR;
/* (non-Javadoc)
* @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#composeMessage()
@@ -23,7 +24,7 @@
public Message composeMessage() throws SchedulingException
{
Message message = org.jboss.soa.esb.message.format.MessageFactory.getInstance().getMessage();
- message.getProperties().setProperty(MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_RDLVR);
+ message.getProperties().setProperty(MessageStore.CLASSIFICATION, classification);
return message;
}
@@ -31,15 +32,13 @@
* @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#initialize(org.jboss.soa.esb.helpers.ConfigTree)
*/
public void initialize(ConfigTree config) throws ConfigurationException {
- // TODO Auto-generated method stub
-
+ classification = config.getAttribute( MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_RDLVR );
}
/* (non-Javadoc)
* @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#onProcessingComplete(org.jboss.soa.esb.message.Message)
*/
public Message onProcessingComplete(Message message) throws SchedulingException {
- // TODO Auto-generated method stub
return null;
}
@@ -47,8 +46,11 @@
* @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#uninitialize()
*/
public void uninitialize() {
- // TODO Auto-generated method stub
-
}
+ public String getClassification()
+ {
+ return classification;
+ }
+
}
Added: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java (rev 0)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java 2007-10-10 07:39:44 UTC (rev 15712)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2006, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.schedule;
+
+import static org.jboss.soa.esb.services.persistence.MessageStore.CLASSIFICATION;
+import static org.jboss.soa.esb.services.persistence.MessageStore.CLASSIFICATION_RDLVR;
+import static org.junit.Assert.assertEquals;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.junit.Test;
+
+/**
+ *
+ * @author <a href="mailto:daniel.bevenius at gmail.com">Daniel Bevenius</a>
+ *
+ */
+public class RedeliverEventMessageComposerUnitTest
+{
+ private RedeliverEventMessageComposer composer = new RedeliverEventMessageComposer();
+ private final String customClassification = "test-classification";
+
+ @Test
+ public void classification_parameter() throws ConfigurationException
+ {
+ composer.initialize( createConfigTree( customClassification ) );
+
+ assertEquals( customClassification, composer.getClassification() );
+ }
+
+ @Test
+ public void composeMessage_default_classification() throws ConfigurationException, SchedulingException
+ {
+ composer.initialize( new ConfigTree("empty") );
+ Message message = composer.composeMessage();
+ assertEquals( CLASSIFICATION_RDLVR, message.getProperties().getProperty( CLASSIFICATION ) );
+ }
+
+ @Test
+ public void composeMessage_custom_classification() throws ConfigurationException, SchedulingException
+ {
+ composer.initialize( createConfigTree( customClassification ) );
+ Message message = composer.composeMessage();
+ assertEquals( customClassification, message.getProperties().getProperty( CLASSIFICATION ) );
+ }
+
+ private ConfigTree createConfigTree( final String customClassification )
+ {
+ ConfigTree config = new ConfigTree("junit");
+ config.setAttribute( CLASSIFICATION, customClassification );
+ return config;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(RedeliverEventMessageComposerUnitTest.class);
+ }
+
+}
More information about the jboss-svn-commits
mailing list