[jboss-svn-commits] JBL Code SVN: r8488 - in labs/jbossesb/trunk/product: core/listeners/tests/src/org/jboss/soa/esb/listeners core/rosetta/src/org/jboss/internal/soa/esb/couriers core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers core/rosetta/src/org/jboss/soa/esb/helpers/persist docs
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Dec 21 07:07:44 EST 2006
Author: mark.little at jboss.com
Date: 2006-12-21 07:07:36 -0500 (Thu, 21 Dec 2006)
New Revision: 8488
Added:
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
Modified:
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
labs/jbossesb/trunk/product/docs/ReleaseNotes.odt
Log:
Added JDBC ListenerManager unit test and made system work with HSQL. Updated ReleaseNotes to call out tested databases.
Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java 2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java 2006-12-21 12:07:36 UTC (rev 8488)
@@ -0,0 +1,156 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.UUID;
+
+import org.jboss.internal.soa.esb.couriers.DeliverOnlyCourier;
+import org.jboss.soa.esb.addressing.Call;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+public class ListenerManagerJDBCUnitTest extends ListenerManagerFileUnitTest
+{
+ public ListenerManagerJDBCUnitTest()
+ {
+ _file = "listenerJdbc.xml";
+ }
+
+ public void setUp()
+ {
+ _logger.info("Writing temp files to " + TMP_DIR);
+
+ // delete this one just to make sure asserts take the new ones
+ _returnFile.delete();
+
+ try
+ {
+ Statement stmt = getDbConnection().createStatement();
+
+ try
+ {
+ stmt.executeUpdate("DROP TABLE esb_messages");
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+
+ stmt.executeUpdate("CREATE TABLE esb_messages (message_id varchar NOT NULL, message varchar, status varchar, insert_timestamp bigint, CONSTRAINT pkey_esb_messages PRIMARY KEY (message_id))");
+ }
+ catch (SQLException ex)
+ {
+ ex.printStackTrace();
+
+ fail();
+ }
+
+ // initialize registry
+ runBeforeAllTests();
+ }
+
+ protected void oneTest () throws Exception
+ {
+ // Write wome messages to EPR obtained from configuration file
+ String configFile = getClass().getResource(_file).getFile();
+ ConfigTree tree = ConfigTree.fromInputStream(new FileInputStream(
+ configFile));
+ ConfigTree eprElement = tree.getAllChildren()[0].getFirstChild("EPR");
+
+ eprElement.setAttribute(ListenerTagNames.URL_TAG, getDbUrl());
+ eprElement.setAttribute("driver", getDbDriver());
+ eprElement.setAttribute("username", getDbUser());
+ eprElement.setAttribute("password", getDbPassword());
+
+ EPR toEPR = ListenerUtil.assembleEpr(eprElement);
+
+ if (toEPR instanceof JDBCEpr)
+ {
+ // OK, so ignore
+ }
+ else
+ fail();
+
+ String THE_TEXT = "___Config=" + _file + "___ Message Content ___";
+
+ int howMany = 10; // how many messages do you want to send before the
+ // listener comes up
+
+ DeliverOnlyCourier sender = CourierFactory.getCourier(toEPR);
+
+ Message message = MessageFactory.getInstance().getMessage();
+ message.getHeader().setCall(new Call(toEPR));
+ message.getBody().setContents(THE_TEXT.getBytes());
+
+ for (int i1 = 0; i1 < howMany; i1++)
+ {
+ URI uri = new URI(UUID.randomUUID().toString());
+ message.getHeader().getCall().setMessageID(uri);
+ sender.deliver(message);
+ }
+
+ _returnFile.delete();
+
+ // launch listener manager in a child thread
+ final ConfigTree newTree = ConfigTree.fromInputStream(new FileInputStream(
+ configFile));
+
+ eprElement = newTree.getAllChildren()[0].getFirstChild("EPR");
+
+ eprElement.setAttribute(ListenerTagNames.URL_TAG, getDbUrl());
+ eprElement.setAttribute("driver", getDbDriver());
+ eprElement.setAttribute("username", getDbUser());
+ eprElement.setAttribute("password", getDbPassword());
+
+ _manager = ListenerUtil.launchManager(newTree, true);
+ _logger.debug("Waiting for all child listeners to start");
+ _manager.waitUntilReady();
+ _logger.debug(" All child listeners ready");
+
+ // JUST FOR THIS TEST:
+ // Give your listener some time to process queued messages (see howMany
+ // above)
+ // Time allowed, and maxThreads in config file will impact how many
+ // messages
+ // will be processed, and how many will remain unprocessed
+
+ Thread.sleep(150 * howMany);
+
+ _logger.debug("going to EndRequested");
+ _manager.requestEnd();
+ _logger.debug("back from EndRequested");
+
+ assertEquals(THE_TEXT, stringFromFile(_returnFile));
+
+ _returnFile.delete();
+ }
+
+}
\ No newline at end of file
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml 2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml 2006-12-21 12:07:36 UTC (rev 8488)
@@ -14,12 +14,12 @@
tablename="esb_messages"
/>
- <action class="org.jboss.soa.esb.listeners.ListenerManagerUnitTest$MockMessageAwareAction" process="writeToDisk" />
- <action class="org.jboss.soa.esb.actions.Notifier " okMethod="notifyOK">
- <NotificationList type="OK">
- <target class="NotifyConsole" />
- </NotificationList>
- </action>
+ <action class="org.jboss.soa.esb.listeners.ListenerManagerBaseTest$MockMessageAwareAction" process="writeToDisk" />
+ <action class="org.jboss.soa.esb.actions.Notifier" okMethod="notifyOK">
+ <NotificationList type="OK">
+ <target class="NotifyConsole" />
+ </NotificationList>
+ </action>
</DummyActionConfig>
</DummyTester>
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2006-12-21 12:07:36 UTC (rev 8488)
@@ -39,113 +39,144 @@
import org.jboss.soa.esb.util.Util;
import org.xml.sax.SAXParseException;
-public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
+public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
{
/**
* disable default constructor
*/
- private SqlTableCourier() { }
+ private SqlTableCourier()
+ {
+ }
/**
- * package protected constructor - Objects of Courier should only be instantiated by the Factory
+ * package protected constructor - Objects of Courier should only be
+ * instantiated by the Factory
+ *
* @param epr
*/
- SqlTableCourier(JDBCEpr epr) throws CourierException { this(epr,false); }
+ SqlTableCourier(JDBCEpr epr) throws CourierException
+ {
+ this(epr, false);
+ }
/**
- * package protected constructor - Objects of Courier should only be instantiated by the Factory
+ * package protected constructor - Objects of Courier should only be
+ * instantiated by the Factory
+ *
* @param epr
*/
SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
{
_isReceiver = isReceiver;
- _epr = epr;
- _sleepForRetries = 3000;
+ _epr = epr;
+ _sleepForRetries = 3000;
try
{
- _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr.getPostDelete()));
- _errorDelete= Boolean.TRUE.equals(Boolean.valueOf(epr.getErrorDelete()));
+ _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+ .getPostDelete()));
+ _errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+ .getErrorDelete()));
}
- catch (URISyntaxException e) { throw new CourierException(e); }
+ catch (URISyntaxException e)
+ {
+ throw new CourierException(e);
+ }
- } //________________________________
+ } // ________________________________
- void cleanup()
+ void cleanup()
{
if (null != _conn)
- try { _conn.release(); }
- catch (Exception e)
- {
- _logger.info("Unable to release connection",e);
+ try
+ {
+ _conn.release();
}
-
- } //________________________________
+ catch (Exception e)
+ {
+ _logger.info("Unable to release connection", e);
+ }
+ } // ________________________________
+
/**
* package the ESB message in a java.io.Serializable, and write it
- * @param message Message - the message to deliver
+ *
+ * @param message
+ * Message - the message to deliver
* @return boolean - the result of the delivery
- * @throws CourierException - if problems were encountered
+ * @throws CourierException -
+ * if problems were encountered
*/
- public boolean deliver(Message message) throws CourierException
+ public boolean deliver(Message message) throws CourierException
{
if (_isReceiver)
throw new CourierException("This is a read-only Courier");
- if (null==message)
+ if (null == message)
return false;
-
-
+
String msgId = null;
try
{
Call call = message.getHeader().getCall();
- URI uri = (null==call) ? null : call.getMessageID();
- msgId = (null==uri) ? null : uri.toString();
- if (null==msgId)
- throw new CourierException ("Message ID must not be null");
+ URI uri = (null == call) ? null : call.getMessageID();
+ msgId = (null == uri) ? null : uri.toString();
+ if (null == msgId)
+ throw new CourierException("Message ID must not be null");
}
- catch (URISyntaxException e) { throw new CourierException(e); }
+ catch (URISyntaxException e)
+ {
+ throw new CourierException(e);
+ }
- if (null==_conn)
- try { _conn = getConn();}
- catch (Exception e) {throw new CourierException(e); }
+ if (null == _conn)
+ try
+ {
+ _conn = getConn();
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
- while(_conn != null)
+ while (_conn != null)
{
- try
+ try
{
int iCol = 1;
PreparedStatement PS = insertStatement();
PS.setString(iCol++, msgId);
PS.setObject(iCol++, Util.serialize(message));
PS.setString(iCol++, State.Pending.getColumnValue());
- PS.setLong (iCol++, System.currentTimeMillis());
+ PS.setLong(iCol++, System.currentTimeMillis());
_conn.execUpdWait(PS, 3);
_conn.commit();
return true;
}
- catch (SQLException e)
+ catch (SQLException e)
{
- if (null!=_conn)
- try { _conn.rollback(); }
+ if (null != _conn)
+ try
+ {
+ _conn.rollback();
+ }
catch (Exception roll)
{
_logger.error(roll);
}
- _logger.error("SQL error",e);
+ _logger.error("SQL error", e);
throw new CourierException(e);
}
- catch (Exception e)
- {
+ catch (Exception e)
+ {
jdbcConnectRetry(e);
}
}
return false;
- } //________________________________
-
- public Message pickup(long millis) throws CourierException
+ } // ________________________________
+
+ public Message pickup(long millis) throws CourierException
{
Message result = null;
long limit = System.currentTimeMillis()
@@ -155,15 +186,21 @@
ResultSet RS = getRowList();
try
{
- while (null!=RS && RS.next())
+ while (null != RS && RS.next())
{
String messageId = RS.getString(1);
- if (null==(result=tryToPickup(messageId)))
+ if (null == (result = tryToPickup(messageId)))
continue;
return result;
}
- try { Thread.sleep(200); }
- catch (InterruptedException e) { return null; }
+ try
+ {
+ Thread.sleep(200);
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
}
catch (SQLException e)
{
@@ -173,35 +210,49 @@
} while (System.currentTimeMillis() <= limit);
return null;
- } //________________________________
+ } // ________________________________
- private Message tryToPickup(String messageId) throws CourierException, SQLException
- {
- int iParm = 1;
- select4UpdateStatement().setString(iParm++, messageId);
- select4UpdateStatement().setString(iParm++, State.Pending.getColumnValue());
- while(_conn != null)
+ private Message tryToPickup(String messageId) throws CourierException,
+ SQLException
+ {
+ int iParm = 1;
+
+ select4UpdateStatement().setString(iParm++, messageId);
+ select4UpdateStatement().setString(iParm++,
+ State.Pending.getColumnValue());
+
+ while (_conn != null)
{
- try
- {
+ try
+ {
ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
while (RS.next())
{
Exception eBad = null;
try
{
- Message result = Util.deserialize((Serializable)RS.getObject(1));
+ Message result = Util.deserialize((Serializable) RS
+ .getObject(1));
if (_postDelete)
deleteMsg(messageId);
else
- changeStatus(messageId,State.Done);
+ changeStatus(messageId, State.Done);
return result;
}
- catch (ClassCastException e){ eBad = e;}
- catch (SAXParseException e){ eBad = e;}
- catch (Exception e) { throw new CourierException(e); }
- if (null!=eBad)
+ catch (ClassCastException e)
{
+ eBad = e;
+ }
+ catch (SAXParseException e)
+ {
+ eBad = e;
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ if (null != eBad)
+ {
if (_errorDelete)
deleteMsg(messageId);
else
@@ -211,164 +262,215 @@
}
return null;
}
- catch (SQLException e) { throw new CourierException(e); }
- catch (Exception e) { jdbcConnectRetry(e); }
+ catch (SQLException e)
+ {
+ throw new CourierException(e);
+ }
+ catch (Exception e)
+ {
+ jdbcConnectRetry(e);
+ }
}
- return null;
- } //________________________________
-
- private void deleteMsg(String messageId)
- throws Exception
+ return null;
+ } // ________________________________
+
+ private void deleteMsg(String messageId) throws Exception
{
int iParm = 1;
deleteStatement().setString(iParm++, messageId);
_conn.execUpdWait(deleteStatement(), 3);
_conn.commit();
-
+
}
-
- private void changeStatus(String messageId, State to)
- throws Exception
- {
- int iParm = 1;
+
+ private void changeStatus(String messageId, State to) throws Exception
+ {
+ int iParm = 1;
updateStatusStatement().setString(iParm++, to.getColumnValue());
updateStatusStatement().setString(iParm++, messageId);
- _conn.execUpdWait(updateStatusStatement(),3);
+ _conn.execUpdWait(updateStatusStatement(), 3);
_conn.commit();
-
- }
+ }
+
private ResultSet getRowList() throws CourierException
{
- if (null==_conn)
- try { _conn = getConn();}
- catch (Exception e) {throw new CourierException(e); }
- while(_conn != null)
+ if (null == _conn)
+ try
+ {
+ _conn = getConn();
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ while (_conn != null)
{
- try { return _conn.execQueryWait(listStatement(), 3); }
- catch (Exception e) {jdbcConnectRetry(e); }
+ try
+ {
+ return _conn.execQueryWait(listStatement(), 3);
+ }
+ catch (Exception e)
+ {
+ jdbcConnectRetry(e);
+ }
}
return null;
-
- } //_______________________________
-
- private void jdbcConnectRetry(Exception exc)
- {
- _logger.error("DB problem, will try to reconnect",exc);
- if (null!=_conn)
- _conn.release();
- _conn = null;
- _prepDelete =
- _prepGetList =
- _prepInsert =
- _prepSel4Upd =
- _prepUpdateStatus
- = null;
- for (int i1=0; i1<3; i1++)
- {
- try { _conn = getConn(); }
- catch (Exception e)
- {
- try {Thread.sleep(_sleepForRetries); }
- catch (InterruptedException eInt) { return; }
- }
- }
- } //________________________________
-
- private JdbcCleanConn getConn() throws Exception
- {
- if (null==_conn)
- {
- SimpleDataSource DS = new SimpleDataSource
- (_epr.getDriver(),_epr.getURL(),_epr.getUserName(), _epr.getPassword());
- _conn = new JdbcCleanConn(DS);
- }
- return _conn;
- } //________________________________
-
- protected PreparedStatement listStatement()
- {
- if (null==_prepGetList)
-
- try
- {
- String[] columns = {_epr.getMessageIdColumn(),_epr.getTimestampColumn()};
-
- StringBuilder sb = new StringBuilder ("select");
- int i1 = 0;
- for (String col:columns)
- sb.append((i1++<1)?" ":",").append(col);
- sb.append(" from ").append(_epr.getTableName());
- sb.append(" where ").append(_epr.getStatusColumn())
- .append("='").append(State.Pending.getColumnValue()).append("'")
- .append(" order by 2");
- ;
- _prepGetList = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e)
- {
- _logger.error("Unable to prepare SQL statement",e);
- return null;
- }
- return _prepGetList;
- } //________________________________
+ } // _______________________________
- protected PreparedStatement select4UpdateStatement()
- {
- if (null==_prepSel4Upd)
- try
- {
- StringBuilder sb = new StringBuilder ("select ")
- .append(_epr.getDataColumn()).append(" from ") .append(_epr.getTableName())
- .append(" where ").append(_epr.getMessageIdColumn()).append("=?")
- .append(" and ") .append(_epr.getStatusColumn()) .append("=?")
- .append(" for update")
- ;
- _prepSel4Upd = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e)
- {
- _logger.error(e);
- return null;
- }
- return _prepSel4Upd;
- } //________________________________
+ private void jdbcConnectRetry(Exception exc)
+ {
+ _logger.error("DB problem, will try to reconnect", exc);
+ if (null != _conn)
+ _conn.release();
+ _conn = null;
- protected PreparedStatement updateStatusStatement()
- {
- if (null==_prepUpdateStatus)
- try
- {
- StringBuilder sb = new StringBuilder ("update ").append(_epr.getTableName())
- .append(" set ").append(_epr.getStatusColumn()) .append("= ?")
- .append(" where ").append(_epr.getMessageIdColumn()).append("=?")
- ;
- _prepUpdateStatus = getConn().prepareStatement(sb.toString());
+ _prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
+ for (int i1 = 0; i1 < 3; i1++)
+ {
+ try
+ {
+ _conn = getConn();
}
catch (Exception e)
{
+ try
+ {
+ Thread.sleep(_sleepForRetries);
+ }
+ catch (InterruptedException eInt)
+ {
+ return;
+ }
+ }
+ }
+ } // ________________________________
+
+ private JdbcCleanConn getConn() throws Exception
+ {
+ if (null == _conn)
+ {
+ SimpleDataSource DS = new SimpleDataSource(_epr.getDriver(), _epr
+ .getURL(), _epr.getUserName(), _epr.getPassword());
+ _conn = new JdbcCleanConn(DS);
+ }
+ return _conn;
+ } // ________________________________
+
+ protected PreparedStatement listStatement()
+ {
+ if (null == _prepGetList)
+
+ try
+ {
+ String[] columns =
+ { _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
+
+ StringBuilder sb = new StringBuilder("select");
+ int i1 = 0;
+ for (String col : columns)
+ sb.append((i1++ < 1) ? " " : ",").append(col);
+ sb.append(" from ").append(_epr.getTableName());
+ sb.append(" where ").append(_epr.getStatusColumn())
+ .append("='").append(State.Pending.getColumnValue())
+ .append("'").append(" order by 2");
+ ;
+ _prepGetList = getConn().prepareStatement(sb.toString());
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to prepare SQL statement", e);
+ return null;
+ }
+ return _prepGetList;
+ } // ________________________________
+
+ protected PreparedStatement select4UpdateStatement()
+ {
+ if (_prepSel4Upd == null)
+ {
+ try
+ {
+ /*
+ * TODO make this dynamic using a factory pattern.
+ */
+
+ StringBuilder sb = null;
+
+ if (!_epr.getURL().contains("hsqldb"))
+ {
+ sb = new StringBuilder("select ").append(
+ _epr.getDataColumn()).append(" from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append("=?").append(
+ " and ").append(_epr.getStatusColumn())
+ .append("=?").append(" for update");
+ }
+ else
+ {
+ /*
+ * HSQL does not support FOR UPDATE! All tables appear to
+ * be inherently updatable!
+ */
+
+ sb = new StringBuilder("select ").append(
+ _epr.getDataColumn()).append(" from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append("=?").append(
+ " and ").append(_epr.getStatusColumn())
+ .append("=?");
+ }
+
+ _prepSel4Upd = getConn().prepareStatement(sb.toString());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
_logger.error(e);
return null;
}
+ }
+
+ return _prepSel4Upd;
+ } // ________________________________
+
+ protected PreparedStatement updateStatusStatement()
+ {
+ if (null == _prepUpdateStatus)
+ try
+ {
+ StringBuilder sb = new StringBuilder("update ").append(
+ _epr.getTableName()).append(" set ").append(
+ _epr.getStatusColumn()).append("= ?").append(" where ")
+ .append(_epr.getMessageIdColumn()).append("=?");
+ _prepUpdateStatus = getConn().prepareStatement(sb.toString());
+ }
+ catch (Exception e)
+ {
+ _logger.error(e);
+ return null;
+ }
return _prepUpdateStatus;
- } //________________________________
+ } // ________________________________
- protected PreparedStatement insertStatement()
- {
- if (null==_prepInsert)
- try
- {
- String[] columns = {_epr.getMessageIdColumn() ,_epr.getDataColumn()
- ,_epr.getStatusColumn() ,_epr.getTimestampColumn()};
-
- StringBuilder sb = new StringBuilder ("insert into ").append(_epr.getTableName())
- .append("(");
- int i1 = 0;
- for (String col:columns)
- sb.append((i1++<1)?" ":",").append(col);
- sb.append(") values (?,?,?,?)");
- _prepInsert = getConn().prepareStatement(sb.toString());
+ protected PreparedStatement insertStatement()
+ {
+ if (null == _prepInsert)
+ try
+ {
+ String[] columns =
+ { _epr.getMessageIdColumn(), _epr.getDataColumn(),
+ _epr.getStatusColumn(), _epr.getTimestampColumn() };
+
+ StringBuilder sb = new StringBuilder("insert into ").append(
+ _epr.getTableName()).append("(");
+ int i1 = 0;
+ for (String col : columns)
+ sb.append((i1++ < 1) ? " " : ",").append(col);
+ sb.append(") values (?,?,?,?)");
+ _prepInsert = getConn().prepareStatement(sb.toString());
}
catch (Exception e)
{
@@ -376,16 +478,17 @@
return null;
}
return _prepInsert;
- } //________________________________
+ } // ________________________________
- protected PreparedStatement deleteStatement()
- {
- if (null==_prepDelete)
- try
- {
- StringBuilder sb = new StringBuilder ("delete from ").append(_epr.getTableName())
- .append(" where ").append(_epr.getMessageIdColumn()).append(" =?");
- _prepDelete = getConn().prepareStatement(sb.toString());
+ protected PreparedStatement deleteStatement()
+ {
+ if (null == _prepDelete)
+ try
+ {
+ StringBuilder sb = new StringBuilder("delete from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append(" =?");
+ _prepDelete = getConn().prepareStatement(sb.toString());
}
catch (Exception e)
{
@@ -393,25 +496,36 @@
return null;
}
return _prepDelete;
- } //________________________________
+ } // ________________________________
- protected enum State
- { Pending, WorkInProgress, Done, Error;
- String getColumnValue() {return toString().substring(0,1); }
- };
+ protected enum State
+ {
+ Pending, WorkInProgress, Done, Error;
+ String getColumnValue()
+ {
+ return toString().substring(0, 1);
+ }
+ };
- protected long _sleepForRetries = 3000; // milliseconds
+ protected long _sleepForRetries = 3000; // milliseconds
- protected boolean _postDelete ,_errorDelete;
- protected boolean _isReceiver;
- protected JDBCEpr _epr;
- protected JdbcCleanConn _conn;
- protected PreparedStatement _prepGetList;;
- protected PreparedStatement _prepSel4Upd;
- protected PreparedStatement _prepUpdateStatus;
- protected PreparedStatement _prepInsert;
- protected PreparedStatement _prepDelete;
-
+ protected boolean _postDelete, _errorDelete;
- protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+ protected boolean _isReceiver;
+
+ protected JDBCEpr _epr;
+
+ protected JdbcCleanConn _conn;
+
+ protected PreparedStatement _prepGetList;;
+
+ protected PreparedStatement _prepSel4Upd;
+
+ protected PreparedStatement _prepUpdateStatus;
+
+ protected PreparedStatement _prepInsert;
+
+ protected PreparedStatement _prepDelete;
+
+ protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java 2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java 2006-12-21 12:07:36 UTC (rev 8488)
@@ -14,17 +14,23 @@
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.util.Util;
-public class LocalFileHandler implements FileHandler
+public class LocalFileHandler implements FileHandler
{
- private LocalFileHandler() {}
- LocalFileHandler(FileEpr epr) {_epr = epr; }
+ private LocalFileHandler()
+ {
+ }
- public boolean deleteFile(File file) throws CourierException
+ LocalFileHandler(FileEpr epr)
{
+ _epr = epr;
+ }
+
+ public boolean deleteFile(File file) throws CourierException
+ {
return file.delete();
}
- public byte[] getFileContents(File file) throws CourierException
+ public byte[] getFileContents(File file) throws CourierException
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] ba = new byte[1000];
@@ -33,66 +39,93 @@
try
{
FileInputStream inp = new FileInputStream(file);
- while (-1!= (iQread=inp.read(ba)))
+ while (-1 != (iQread = inp.read(ba)))
if (iQread > 0)
- out.write(ba,0,iQread);
+ out.write(ba, 0, iQread);
inp.close();
out.close();
return out.toByteArray();
}
- catch(FileNotFoundException e) {throw new CourierException(e);}
- catch (IOException e) {throw new CourierException(e);}
+ catch (FileNotFoundException e)
+ {
+ throw new CourierException(e);
+ }
+ catch (IOException e)
+ {
+ throw new CourierException(e);
+ }
- // Just in case... (comment next line out to see if you're missing something)
- catch (Exception e) {throw new CourierException(e);}
+ // Just in case... (comment next line out to see if you're missing
+ // something)
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
}
- public File[] getFileList() throws CourierException
+ public File[] getFileList() throws CourierException
{
try
{
File dir = new File(_epr.getURL().getFile());
- if (! dir.isDirectory())
- throw new CourierException("Can't get file list if URL is not a directory");
+ if (!dir.isDirectory())
+ throw new CourierException(
+ "Can't get file list if URL is not a directory");
FileFilter filter = new FileEndsWith(_epr.getInputSuffix());
return dir.listFiles(filter);
}
- catch (URISyntaxException e) {throw new CourierException(e);}
- catch (MalformedURLException e) {throw new CourierException(e);}
- // Just in case... (comment next line out to see if you're missing something)
- catch (Exception e) {throw new CourierException(e);}
+ catch (URISyntaxException e)
+ {
+ throw new CourierException(e);
+ }
+ catch (MalformedURLException e)
+ {
+ throw new CourierException(e);
+ }
+ // Just in case... (comment next line out to see if you're missing
+ // something)
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
}
- public boolean renameFile(File from, File to) throws CourierException
+ public boolean renameFile(File from, File to) throws CourierException
{
try
- {
+ {
to.delete();
if (!from.renameTo(to))
- throw new CourierException("Unable to rename from "+from+" to "+to);
+ throw new CourierException("Unable to rename from " + from
+ + " to " + to);
return true;
}
- catch (Exception e) {throw new CourierException(e);}
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
}
private class FileEndsWith implements FileFilter
- {
- String m_sSuffix;
- FileEndsWith(String p_sEnd) throws CourierException
- {
- m_sSuffix = p_sEnd;
- if (Util.isNullString(m_sSuffix))
- throw new CourierException("A file suffix (or full Message id) must be specified for pickup");
- } //______________________________
+ {
+ String m_sSuffix;
- public boolean accept(File p_f)
- { return (p_f.isFile())
- ? p_f.toString().endsWith(m_sSuffix)
- : false;
- } //______________________________
- } //____________________________________________________
-
+ FileEndsWith(String p_sEnd) throws CourierException
+ {
+ m_sSuffix = p_sEnd;
+ if (Util.isNullString(m_sSuffix))
+ throw new CourierException(
+ "A file suffix (or full Message id) must be specified for pickup");
+ } // ______________________________
+
+ public boolean accept(File p_f)
+ {
+ return (p_f.isFile()) ? p_f.toString().endsWith(m_sSuffix) : false;
+ } // ______________________________
+ } // ____________________________________________________
+
protected FileEpr _epr;
+
static Logger _logger = Logger.getLogger(LocalFileHandler.class);
}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2006-12-21 12:07:36 UTC (rev 8488)
@@ -32,7 +32,8 @@
import org.apache.log4j.Logger;
-public class JdbcCleanConn {
+public class JdbcCleanConn
+{
private DataSource m_oDS = null;
private Connection m_conn = null;
@@ -41,78 +42,107 @@
protected Logger m_oLogger;
- public JdbcCleanConn(DataSource p_oDS) throws Exception {
+ public JdbcCleanConn(DataSource p_oDS) throws Exception
+ {
m_oDS = p_oDS;
m_oLogger = Logger.getLogger(this.getClass());
}
- public void commit() throws Exception {
- if (null != m_conn) {
+ public void commit() throws Exception
+ {
+ if (null != m_conn)
+ {
m_conn.commit();
- }
+ }
}
- public void rollback() throws Exception {
- if (null != m_conn) {
+ public void rollback() throws Exception
+ {
+ if (null != m_conn)
+ {
m_conn.rollback();
- }
+ }
}
-
- public void release() {
- if (null != m_conn) {
- try {
+
+ public void release()
+ {
+ if (null != m_conn)
+ {
+ try
+ {
m_conn.rollback();
- } catch (Exception eRoll) {
}
+ catch (Exception eRoll)
+ {
+ }
for (PreparedStatement PS : m_olPrepSt)
- try {
+ try
+ {
PS.close();
- } catch (Exception e) {
}
- try {
+ catch (Exception e)
+ {
+ }
+ try
+ {
m_conn.close();
- } catch (Exception e1) {
}
+ catch (Exception e1)
+ {
+ }
}
m_olPrepSt.clear();
m_conn = null;
} // __________________________________
public PreparedStatement prepareStatement(String p_sSt, int p_i1, int p_i2)
- throws Exception {
- if (null == m_conn) {
- connect();
- }
+ throws Exception
+ {
+ if (null == m_conn)
+ {
+ connect();
+ }
PreparedStatement PS = m_conn.prepareStatement(p_sSt, p_i1, p_i2);
m_olPrepSt.add(PS);
return PS;
} // __________________________________
- public PreparedStatement prepareStatement(String p_sSt) throws Exception {
- if (null == m_conn) {
+ public PreparedStatement prepareStatement(String p_sSt) throws Exception
+ {
+ if (null == m_conn)
+ {
connect();
- }
-
+ }
+
PreparedStatement PS = m_conn.prepareStatement(p_sSt);
m_olPrepSt.add(PS);
return PS;
} // __________________________________
- public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
- if (null == m_conn) {
- connect();
- }
-
+ public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry)
+ throws Exception
+ {
+ if (null == m_conn)
+ {
+ connect();
+ }
+
Exception eRet = null;
int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
- for (int i1 = 0; i1 < iQtry; i1++) {
- try {
+ for (int i1 = 0; i1 < iQtry; i1++)
+ {
+ try
+ {
return p_PS.executeQuery();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
if (null == eRet)
eRet = new Exception(e.getMessage());
- // System.out.println("Retrying "+i1);
+
+ // TODO magic number!!
+
Thread.sleep(100 + (new Double(100 * Math.random()))
.longValue());
}
@@ -121,18 +151,25 @@
throw eRet;
} // __________________________________
- public void execUpdWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
- if (null == m_conn) {
- connect();
- }
-
+ public void execUpdWait(PreparedStatement p_PS, int p_iQtry)
+ throws Exception
+ {
+ if (null == m_conn)
+ {
+ connect();
+ }
+
Exception eRet = null;
int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
- for (int i1 = 0; i1 < iQtry; i1++) {
- try {
+ for (int i1 = 0; i1 < iQtry; i1++)
+ {
+ try
+ {
p_PS.executeUpdate();
return;
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
if (null == eRet)
eRet = e;
// System.out.println("Retrying "+i1);
@@ -144,29 +181,47 @@
throw eRet;
} // __________________________________
- private void connect() throws Exception {
- if(m_conn != null) {
- return;
- }
-
+ private void connect() throws Exception
+ {
+ if (m_conn != null)
+ {
+ return;
+ }
+
Exception eRet = null;
- for (int i1 = 0; i1 < 5; i1++) {
- try {
+ for (int i1 = 0; i1 < 5; i1++)
+ {
+ try
+ {
m_conn = m_oDS.getConnection();
eRet = null;
break;
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
if (null == eRet)
eRet = e;
- System.out.println("Connecting " + i1);
+
+ // TODO magic number!!
+
Thread.sleep(2000 + (new Double(100 * Math.random()))
.longValue());
}
}
- if (null != eRet) {
+
+ if (eRet != null)
+ {
m_oLogger.error("connect() FAILED", eRet);
throw eRet;
}
+
+ if (m_conn == null)
+ {
+ m_oLogger.error("connect() FAILED: no connection");
+
+ throw new RuntimeException();
+ }
+
m_conn.setAutoCommit(false);
m_conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
Modified: labs/jbossesb/trunk/product/docs/ReleaseNotes.odt
===================================================================
(Binary files differ)
More information about the jboss-svn-commits
mailing list