[jboss-svn-commits] JBL Code SVN: r8488 - in labs/jbossesb/trunk/product: core/listeners/tests/src/org/jboss/soa/esb/listeners core/rosetta/src/org/jboss/internal/soa/esb/couriers core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers core/rosetta/src/org/jboss/soa/esb/helpers/persist docs

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Dec 21 07:07:44 EST 2006


Author: mark.little at jboss.com
Date: 2006-12-21 07:07:36 -0500 (Thu, 21 Dec 2006)
New Revision: 8488

Added:
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
   labs/jbossesb/trunk/product/docs/ReleaseNotes.odt
Log:
Added JDBC ListenerManager unit test and made system work with HSQL. Updated ReleaseNotes to call out tested databases.

Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java	2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java	2006-12-21 12:07:36 UTC (rev 8488)
@@ -0,0 +1,156 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.UUID;
+
+import org.jboss.internal.soa.esb.couriers.DeliverOnlyCourier;
+import org.jboss.soa.esb.addressing.Call;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+public class ListenerManagerJDBCUnitTest extends ListenerManagerFileUnitTest
+{
+	public ListenerManagerJDBCUnitTest()
+	{
+		_file = "listenerJdbc.xml";
+	}
+
+	public void setUp()
+	{
+		_logger.info("Writing temp files to " + TMP_DIR);
+
+		// delete this one just to make sure asserts take the new ones
+		_returnFile.delete();
+
+		try
+		{
+			Statement stmt = getDbConnection().createStatement();
+
+			try
+			{
+				stmt.executeUpdate("DROP TABLE esb_messages");
+			}
+			catch (Exception e)
+			{
+				// Ignore
+			}
+
+			stmt.executeUpdate("CREATE TABLE esb_messages (message_id varchar NOT NULL, message varchar, status varchar, insert_timestamp bigint, CONSTRAINT pkey_esb_messages PRIMARY KEY (message_id))");
+		}
+		catch (SQLException ex)
+		{
+			ex.printStackTrace();
+
+			fail();
+		}
+
+		// initialize registry
+		runBeforeAllTests();
+	}
+	
+	protected void oneTest () throws Exception
+	{
+		// Write wome messages to EPR obtained from configuration file
+		String configFile = getClass().getResource(_file).getFile();
+		ConfigTree tree = ConfigTree.fromInputStream(new FileInputStream(
+				configFile));
+		ConfigTree eprElement = tree.getAllChildren()[0].getFirstChild("EPR");
+		
+		eprElement.setAttribute(ListenerTagNames.URL_TAG, getDbUrl());
+		eprElement.setAttribute("driver", getDbDriver());
+		eprElement.setAttribute("username", getDbUser());
+		eprElement.setAttribute("password", getDbPassword());
+		
+		EPR toEPR = ListenerUtil.assembleEpr(eprElement);
+
+		if (toEPR instanceof JDBCEpr)
+		{
+			// OK, so ignore
+		}
+		else
+			fail();
+		
+		String THE_TEXT = "___Config=" + _file + "___ Message Content ___";
+
+		int howMany = 10; // how many messages do you want to send before the
+							// listener comes up
+		
+		DeliverOnlyCourier sender = CourierFactory.getCourier(toEPR);
+		
+		Message message = MessageFactory.getInstance().getMessage();
+		message.getHeader().setCall(new Call(toEPR));
+		message.getBody().setContents(THE_TEXT.getBytes());
+		
+		for (int i1 = 0; i1 < howMany; i1++)
+		{
+			URI uri = new URI(UUID.randomUUID().toString());
+			message.getHeader().getCall().setMessageID(uri);
+			sender.deliver(message);
+		}
+
+		_returnFile.delete();
+		
+		//		 launch listener manager in a child thread
+		final ConfigTree newTree = ConfigTree.fromInputStream(new FileInputStream(
+				configFile));
+		
+		eprElement = newTree.getAllChildren()[0].getFirstChild("EPR");
+		
+		eprElement.setAttribute(ListenerTagNames.URL_TAG, getDbUrl());
+		eprElement.setAttribute("driver", getDbDriver());
+		eprElement.setAttribute("username", getDbUser());
+		eprElement.setAttribute("password", getDbPassword());
+
+		_manager = ListenerUtil.launchManager(newTree, true);
+		_logger.debug("Waiting for all child listeners to start");
+		_manager.waitUntilReady();
+		_logger.debug(" All child listeners ready");
+
+		// JUST FOR THIS TEST:
+		// Give your listener some time to process queued messages (see howMany
+		// above)
+		// Time allowed, and maxThreads in config file will impact how many
+		// messages
+		// will be processed, and how many will remain unprocessed
+		
+		Thread.sleep(150 * howMany);
+
+		_logger.debug("going to EndRequested");
+		_manager.requestEnd();
+		_logger.debug("back from EndRequested");
+
+		assertEquals(THE_TEXT, stringFromFile(_returnFile));
+		
+		_returnFile.delete();
+	}
+
+}
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml	2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/listenerJdbc.xml	2006-12-21 12:07:36 UTC (rev 8488)
@@ -14,12 +14,12 @@
             tablename="esb_messages" 
         />
         
-        <action class="org.jboss.soa.esb.listeners.ListenerManagerUnitTest$MockMessageAwareAction" process="writeToDisk" />
-        <action  class="org.jboss.soa.esb.actions.Notifier "  okMethod="notifyOK">
-               <NotificationList type="OK"> 
-                <target class="NotifyConsole" /> 
-            </NotificationList> 
-        </action> 
+        <action class="org.jboss.soa.esb.listeners.ListenerManagerBaseTest$MockMessageAwareAction" process="writeToDisk" />
+		<action  class="org.jboss.soa.esb.actions.Notifier"  okMethod="notifyOK">
+		   	<NotificationList type="OK"> 
+				<target class="NotifyConsole" /> 
+			</NotificationList> 
+		</action> 
    </DummyActionConfig>
 </DummyTester>
 

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2006-12-21 12:07:36 UTC (rev 8488)
@@ -39,113 +39,144 @@
 import org.jboss.soa.esb.util.Util;
 import org.xml.sax.SAXParseException;
 
-public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier 
+public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
 {
 	/**
 	 * disable default constructor
 	 */
-	private SqlTableCourier() { }
+	private SqlTableCourier()
+	{
+	}
 
 	/**
-	 * package protected constructor - Objects of Courier should only be instantiated by the Factory
+	 * package protected constructor - Objects of Courier should only be
+	 * instantiated by the Factory
+	 * 
 	 * @param epr
 	 */
-	SqlTableCourier(JDBCEpr epr) throws CourierException { this(epr,false); }
+	SqlTableCourier(JDBCEpr epr) throws CourierException
+	{
+		this(epr, false);
+	}
 
 	/**
-	 * package protected constructor - Objects of Courier should only be instantiated by the Factory
+	 * package protected constructor - Objects of Courier should only be
+	 * instantiated by the Factory
+	 * 
 	 * @param epr
 	 */
 	SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
 	{
 		_isReceiver = isReceiver;
-		_epr		= epr;
-		_sleepForRetries	= 3000;
+		_epr = epr;
+		_sleepForRetries = 3000;
 		try
 		{
-			_postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr.getPostDelete()));
-			_errorDelete= Boolean.TRUE.equals(Boolean.valueOf(epr.getErrorDelete()));
+			_postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+					.getPostDelete()));
+			_errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+					.getErrorDelete()));
 		}
-		catch (URISyntaxException e) { throw new CourierException(e); }
+		catch (URISyntaxException e)
+		{
+			throw new CourierException(e);
+		}
 
-	} //________________________________
+	} // ________________________________
 
-	void cleanup() 
+	void cleanup()
 	{
 		if (null != _conn)
-			try { _conn.release(); }
-			catch (Exception e) 
-			{ 
-				_logger.info("Unable to release connection",e);
+			try
+			{
+				_conn.release();
 			}
-		
-    } //________________________________
+			catch (Exception e)
+			{
+				_logger.info("Unable to release connection", e);
+			}
 
+	} // ________________________________
+
 	/**
 	 * package the ESB message in a java.io.Serializable, and write it
-	 * @param message Message - the message to deliver 
+	 * 
+	 * @param message
+	 *            Message - the message to deliver
 	 * @return boolean - the result of the delivery
-	 * @throws CourierException - if problems were encountered
+	 * @throws CourierException -
+	 *             if problems were encountered
 	 */
-	public boolean deliver(Message message) throws CourierException 
+	public boolean deliver(Message message) throws CourierException
 	{
 		if (_isReceiver)
 			throw new CourierException("This is a read-only Courier");
 
-		if (null==message)
+		if (null == message)
 			return false;
-		
-		
+
 		String msgId = null;
 		try
 		{
 			Call call = message.getHeader().getCall();
-			URI  uri  = (null==call) ? null : call.getMessageID();
-			msgId = (null==uri) ? null : uri.toString();
-			if (null==msgId)
-				throw new CourierException ("Message ID must not be null");
+			URI uri = (null == call) ? null : call.getMessageID();
+			msgId = (null == uri) ? null : uri.toString();
+			if (null == msgId)
+				throw new CourierException("Message ID must not be null");
 		}
-		catch (URISyntaxException e) { throw new CourierException(e); }
+		catch (URISyntaxException e)
+		{
+			throw new CourierException(e);
+		}
 
-		if (null==_conn)
-			try {  _conn = getConn();}
-			catch (Exception e) {throw new CourierException(e); }
+		if (null == _conn)
+			try
+			{
+				_conn = getConn();
+			}
+			catch (Exception e)
+			{
+				throw new CourierException(e);
+			}
 
-		while(_conn != null)
+		while (_conn != null)
 		{
-			try 
+			try
 			{
 				int iCol = 1;
 				PreparedStatement PS = insertStatement();
 				PS.setString(iCol++, msgId);
 				PS.setObject(iCol++, Util.serialize(message));
 				PS.setString(iCol++, State.Pending.getColumnValue());
-				PS.setLong	(iCol++, System.currentTimeMillis());
+				PS.setLong(iCol++, System.currentTimeMillis());
 
 				_conn.execUpdWait(PS, 3);
 				_conn.commit();
 				return true;
 			}
-			catch (SQLException e)	
+			catch (SQLException e)
 			{
-				if (null!=_conn)
-					try { _conn.rollback(); }
+				if (null != _conn)
+					try
+					{
+						_conn.rollback();
+					}
 					catch (Exception roll)
 					{
 						_logger.error(roll);
 					}
-				_logger.error("SQL error",e);
+				_logger.error("SQL error", e);
 				throw new CourierException(e);
 			}
-			catch (Exception e)		
-			{ 
+			catch (Exception e)
+			{
 				jdbcConnectRetry(e);
 			}
 		}
 		return false;
-	} //________________________________
-	
-	public Message pickup(long millis) throws CourierException 
+	} // ________________________________
+
+	public Message pickup(long millis) throws CourierException
 	{
 		Message result = null;
 		long limit = System.currentTimeMillis()
@@ -155,15 +186,21 @@
 			ResultSet RS = getRowList();
 			try
 			{
-				while (null!=RS && RS.next())
+				while (null != RS && RS.next())
 				{
 					String messageId = RS.getString(1);
-					if (null==(result=tryToPickup(messageId)))
+					if (null == (result = tryToPickup(messageId)))
 						continue;
 					return result;
 				}
-				try { Thread.sleep(200); }
-				catch (InterruptedException e) { return null; }
+				try
+				{
+					Thread.sleep(200);
+				}
+				catch (InterruptedException e)
+				{
+					return null;
+				}
 			}
 			catch (SQLException e)
 			{
@@ -173,35 +210,49 @@
 
 		} while (System.currentTimeMillis() <= limit);
 		return null;
-    } //________________________________
+	} // ________________________________
 
-    private Message tryToPickup(String messageId) throws CourierException, SQLException
-    {
-    	int iParm = 1;
-    	select4UpdateStatement().setString(iParm++, messageId);
-    	select4UpdateStatement().setString(iParm++, State.Pending.getColumnValue());
-		while(_conn != null)
+	private Message tryToPickup(String messageId) throws CourierException,
+			SQLException
+	{
+		int iParm = 1;
+
+		select4UpdateStatement().setString(iParm++, messageId);
+		select4UpdateStatement().setString(iParm++,
+				State.Pending.getColumnValue());
+
+		while (_conn != null)
 		{
-			try 
-			{ 
+			try
+			{
 				ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
 				while (RS.next())
 				{
 					Exception eBad = null;
 					try
 					{
-						Message result = Util.deserialize((Serializable)RS.getObject(1));
+						Message result = Util.deserialize((Serializable) RS
+								.getObject(1));
 						if (_postDelete)
 							deleteMsg(messageId);
 						else
-							changeStatus(messageId,State.Done);
+							changeStatus(messageId, State.Done);
 						return result;
 					}
-					catch (ClassCastException e){ eBad = e;}
-					catch (SAXParseException  e){ eBad = e;}
-					catch (Exception e) { throw new CourierException(e); }
-					if (null!=eBad)
+					catch (ClassCastException e)
 					{
+						eBad = e;
+					}
+					catch (SAXParseException e)
+					{
+						eBad = e;
+					}
+					catch (Exception e)
+					{
+						throw new CourierException(e);
+					}
+					if (null != eBad)
+					{
 						if (_errorDelete)
 							deleteMsg(messageId);
 						else
@@ -211,164 +262,215 @@
 				}
 				return null;
 			}
-			catch (SQLException e)	{ throw new CourierException(e); }
-			catch (Exception e)  	{ jdbcConnectRetry(e); }
+			catch (SQLException e)
+			{
+				throw new CourierException(e);
+			}
+			catch (Exception e)
+			{
+				jdbcConnectRetry(e);
+			}
 		}
-    	return null;
-    } //________________________________
-    
-    private void deleteMsg(String messageId)
-		throws Exception
+		return null;
+	} // ________________________________
+
+	private void deleteMsg(String messageId) throws Exception
 	{
 		int iParm = 1;
 		deleteStatement().setString(iParm++, messageId);
 		_conn.execUpdWait(deleteStatement(), 3);
 		_conn.commit();
-		
+
 	}
-	
-    private void changeStatus(String messageId, State to)
-    	throws Exception
-    {
-    	int iParm = 1;
+
+	private void changeStatus(String messageId, State to) throws Exception
+	{
+		int iParm = 1;
 		updateStatusStatement().setString(iParm++, to.getColumnValue());
 		updateStatusStatement().setString(iParm++, messageId);
-		_conn.execUpdWait(updateStatusStatement(),3);
+		_conn.execUpdWait(updateStatusStatement(), 3);
 		_conn.commit();
-    	
-    }
 
+	}
+
 	private ResultSet getRowList() throws CourierException
 	{
-		if (null==_conn)
-			try {  _conn = getConn();}
-			catch (Exception e) {throw new CourierException(e); }
-		while(_conn != null)
+		if (null == _conn)
+			try
+			{
+				_conn = getConn();
+			}
+			catch (Exception e)
+			{
+				throw new CourierException(e);
+			}
+		while (_conn != null)
 		{
-			try { return _conn.execQueryWait(listStatement(), 3); }
-			catch (Exception e)  {jdbcConnectRetry(e); }
+			try
+			{
+				return _conn.execQueryWait(listStatement(), 3);
+			}
+			catch (Exception e)
+			{
+				jdbcConnectRetry(e);
+			}
 		}
 		return null;
-		
-	} //_______________________________
-	
-    private void jdbcConnectRetry(Exception exc)
-    {
-    	_logger.error("DB problem, will try to reconnect",exc);
-    	if (null!=_conn)
-    		_conn.release();
-    	_conn	= null;
 
-    	_prepDelete = 
-    	_prepGetList = 
-    	_prepInsert = 
-    	_prepSel4Upd = 
-    	_prepUpdateStatus 
-    		= null;
-    	for (int i1=0; i1<3; i1++)
-    	{
-    		try { _conn = getConn(); }
-    		catch (Exception e)
-    		{
-    			try {Thread.sleep(_sleepForRetries); }
-    			catch (InterruptedException eInt) { return; }
-    		}
-    	}
-    } //________________________________
-    
-    private JdbcCleanConn getConn() throws Exception
-    {
-    	if (null==_conn)
-    	{
-    		SimpleDataSource DS = new SimpleDataSource
-    			(_epr.getDriver(),_epr.getURL(),_epr.getUserName(), _epr.getPassword());
-    		_conn = new JdbcCleanConn(DS);
-    	}
-    	return _conn;
-    } //________________________________
-    
-    protected PreparedStatement listStatement()
-    {
-    	if (null==_prepGetList)
-    		
-	    	try
-	    	{
-		        String[] columns =	{_epr.getMessageIdColumn(),_epr.getTimestampColumn()};
-		
-		    	StringBuilder sb = new StringBuilder ("select");
-		    	int i1 = 0;
-		    	for (String col:columns)
-		    		sb.append((i1++<1)?" ":",").append(col);
-		    	sb.append(" from ").append(_epr.getTableName());
-		    	sb.append(" where ").append(_epr.getStatusColumn())
-		    		.append("='").append(State.Pending.getColumnValue()).append("'")
-		    		.append(" order by 2");
-		    	;
-		    	_prepGetList = getConn().prepareStatement(sb.toString());
-	    	}
-	    	catch (Exception e)
-	    	{
-	    		_logger.error("Unable to prepare SQL statement",e);
-	    		return null;
-	    	}
-	    return _prepGetList;
-    } //________________________________
+	} // _______________________________
 
-    protected PreparedStatement select4UpdateStatement()
-    {
-    	if (null==_prepSel4Upd)
-	    	try
-	    	{
-		    	StringBuilder sb = new StringBuilder ("select ")
-					.append(_epr.getDataColumn()).append(" from ") .append(_epr.getTableName())
-					.append(" where ").append(_epr.getMessageIdColumn()).append("=?")
-					.append(" and ")  .append(_epr.getStatusColumn())	.append("=?")
-		    		.append(" for update")
-		    	;
-		    	_prepSel4Upd = getConn().prepareStatement(sb.toString()); 
-	    	}
-	    	catch (Exception e)
-	    	{
-	    		_logger.error(e);
-	    		return null;
-	    	}
-    	return _prepSel4Upd;
-    } //________________________________
+	private void jdbcConnectRetry(Exception exc)
+	{
+		_logger.error("DB problem, will try to reconnect", exc);
+		if (null != _conn)
+			_conn.release();
+		_conn = null;
 
-    protected PreparedStatement updateStatusStatement()
-    {
-    	if (null==_prepUpdateStatus)
-	    	try
-	    	{
-		    	StringBuilder sb = new StringBuilder ("update ").append(_epr.getTableName())
-		    		.append(" set ").append(_epr.getStatusColumn())		.append("= ?")
-		    		.append(" where ").append(_epr.getMessageIdColumn()).append("=?")
-		    	;
-		    	_prepUpdateStatus = getConn().prepareStatement(sb.toString()); 
+		_prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
+		for (int i1 = 0; i1 < 3; i1++)
+		{
+			try
+			{
+				_conn = getConn();
 			}
 			catch (Exception e)
 			{
+				try
+				{
+					Thread.sleep(_sleepForRetries);
+				}
+				catch (InterruptedException eInt)
+				{
+					return;
+				}
+			}
+		}
+	} // ________________________________
+
+	private JdbcCleanConn getConn() throws Exception
+	{
+		if (null == _conn)
+		{
+			SimpleDataSource DS = new SimpleDataSource(_epr.getDriver(), _epr
+					.getURL(), _epr.getUserName(), _epr.getPassword());
+			_conn = new JdbcCleanConn(DS);
+		}
+		return _conn;
+	} // ________________________________
+
+	protected PreparedStatement listStatement()
+	{
+		if (null == _prepGetList)
+
+			try
+			{
+				String[] columns =
+				{ _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
+
+				StringBuilder sb = new StringBuilder("select");
+				int i1 = 0;
+				for (String col : columns)
+					sb.append((i1++ < 1) ? " " : ",").append(col);
+				sb.append(" from ").append(_epr.getTableName());
+				sb.append(" where ").append(_epr.getStatusColumn())
+						.append("='").append(State.Pending.getColumnValue())
+						.append("'").append(" order by 2");
+				;
+				_prepGetList = getConn().prepareStatement(sb.toString());
+			}
+			catch (Exception e)
+			{
+				_logger.error("Unable to prepare SQL statement", e);
+				return null;
+			}
+		return _prepGetList;
+	} // ________________________________
+
+	protected PreparedStatement select4UpdateStatement()
+	{
+		if (_prepSel4Upd == null)
+		{
+			try
+			{
+				/*
+				 * TODO make this dynamic using a factory pattern.
+				 */
+
+				StringBuilder sb = null;
+
+				if (!_epr.getURL().contains("hsqldb"))
+				{
+					sb = new StringBuilder("select ").append(
+							_epr.getDataColumn()).append(" from ").append(
+							_epr.getTableName()).append(" where ").append(
+							_epr.getMessageIdColumn()).append("=?").append(
+							" and ").append(_epr.getStatusColumn())
+							.append("=?").append(" for update");
+				}
+				else
+				{
+					/*
+					 * HSQL does not support FOR UPDATE! All tables appear to
+					 * be inherently updatable!
+					 */
+					
+					sb = new StringBuilder("select ").append(
+							_epr.getDataColumn()).append(" from ").append(
+							_epr.getTableName()).append(" where ").append(
+							_epr.getMessageIdColumn()).append("=?").append(
+							" and ").append(_epr.getStatusColumn())
+							.append("=?");
+				}
+
+				_prepSel4Upd = getConn().prepareStatement(sb.toString());
+			}
+			catch (Exception e)
+			{
+				e.printStackTrace();
+
 				_logger.error(e);
 				return null;
 			}
+		}
+
+		return _prepSel4Upd;
+	} // ________________________________
+
+	protected PreparedStatement updateStatusStatement()
+	{
+		if (null == _prepUpdateStatus)
+			try
+			{
+				StringBuilder sb = new StringBuilder("update ").append(
+						_epr.getTableName()).append(" set ").append(
+						_epr.getStatusColumn()).append("= ?").append(" where ")
+						.append(_epr.getMessageIdColumn()).append("=?");
+				_prepUpdateStatus = getConn().prepareStatement(sb.toString());
+			}
+			catch (Exception e)
+			{
+				_logger.error(e);
+				return null;
+			}
 		return _prepUpdateStatus;
-    } //________________________________
+	} // ________________________________
 
-    protected PreparedStatement insertStatement()
-    {
-    	if (null==_prepInsert)
-	    	try
-	    	{
-		        String[] columns =	{_epr.getMessageIdColumn()	,_epr.getDataColumn()
-		        					,_epr.getStatusColumn()		,_epr.getTimestampColumn()};
-		    	
-		        StringBuilder sb = new StringBuilder ("insert into ").append(_epr.getTableName())
-		        	.append("(");
-		    	int i1 = 0;
-		    	for (String col:columns)
-		    		sb.append((i1++<1)?" ":",").append(col);
-		    	sb.append(") values (?,?,?,?)");
-		    	_prepInsert = getConn().prepareStatement(sb.toString());
+	protected PreparedStatement insertStatement()
+	{
+		if (null == _prepInsert)
+			try
+			{
+				String[] columns =
+				{ _epr.getMessageIdColumn(), _epr.getDataColumn(),
+						_epr.getStatusColumn(), _epr.getTimestampColumn() };
+
+				StringBuilder sb = new StringBuilder("insert into ").append(
+						_epr.getTableName()).append("(");
+				int i1 = 0;
+				for (String col : columns)
+					sb.append((i1++ < 1) ? " " : ",").append(col);
+				sb.append(") values (?,?,?,?)");
+				_prepInsert = getConn().prepareStatement(sb.toString());
 			}
 			catch (Exception e)
 			{
@@ -376,16 +478,17 @@
 				return null;
 			}
 		return _prepInsert;
-    } //________________________________
+	} // ________________________________
 
-    protected PreparedStatement deleteStatement()
-    {
-    	if (null==_prepDelete)
-	    	try
-	    	{
-		        StringBuilder sb = new StringBuilder ("delete from ").append(_epr.getTableName())
-		        	.append(" where ").append(_epr.getMessageIdColumn()).append(" =?");
-		    	_prepDelete = getConn().prepareStatement(sb.toString());
+	protected PreparedStatement deleteStatement()
+	{
+		if (null == _prepDelete)
+			try
+			{
+				StringBuilder sb = new StringBuilder("delete from ").append(
+						_epr.getTableName()).append(" where ").append(
+						_epr.getMessageIdColumn()).append(" =?");
+				_prepDelete = getConn().prepareStatement(sb.toString());
 			}
 			catch (Exception e)
 			{
@@ -393,25 +496,36 @@
 				return null;
 			}
 		return _prepDelete;
-    } //________________________________
+	} // ________________________________
 
-    protected enum State
-    {	Pending, WorkInProgress, Done, Error;
-    	String	getColumnValue() {return toString().substring(0,1); }
-    };
+	protected enum State
+	{
+		Pending, WorkInProgress, Done, Error;
+		String getColumnValue()
+		{
+			return toString().substring(0, 1);
+		}
+	};
 
-    protected long 				_sleepForRetries = 3000;   //  milliseconds
+	protected long _sleepForRetries = 3000; // milliseconds
 
-    protected boolean			_postDelete	,_errorDelete;
-	protected boolean			_isReceiver;	  
-    protected JDBCEpr			_epr;
-    protected JdbcCleanConn		_conn;
-    protected PreparedStatement _prepGetList;;
-    protected PreparedStatement _prepSel4Upd;
-    protected PreparedStatement _prepUpdateStatus;
-    protected PreparedStatement	_prepInsert;
-    protected PreparedStatement	_prepDelete;
-    
+	protected boolean _postDelete, _errorDelete;
 
-    protected static Logger		_logger = Logger.getLogger(SqlTableCourier.class);
+	protected boolean _isReceiver;
+
+	protected JDBCEpr _epr;
+
+	protected JdbcCleanConn _conn;
+
+	protected PreparedStatement _prepGetList;;
+
+	protected PreparedStatement _prepSel4Upd;
+
+	protected PreparedStatement _prepUpdateStatus;
+
+	protected PreparedStatement _prepInsert;
+
+	protected PreparedStatement _prepDelete;
+
+	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
 }

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java	2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/LocalFileHandler.java	2006-12-21 12:07:36 UTC (rev 8488)
@@ -14,17 +14,23 @@
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.util.Util;
 
-public class LocalFileHandler implements FileHandler 
+public class LocalFileHandler implements FileHandler
 {
-	private LocalFileHandler() {}
-	LocalFileHandler(FileEpr epr) {_epr = epr; }
+	private LocalFileHandler()
+	{
+	}
 
-	public boolean deleteFile(File file) throws CourierException 
+	LocalFileHandler(FileEpr epr)
 	{
+		_epr = epr;
+	}
+
+	public boolean deleteFile(File file) throws CourierException
+	{
 		return file.delete();
 	}
 
-	public byte[] getFileContents(File file) throws CourierException 
+	public byte[] getFileContents(File file) throws CourierException
 	{
 		ByteArrayOutputStream out = new ByteArrayOutputStream();
 		byte[] ba = new byte[1000];
@@ -33,66 +39,93 @@
 		try
 		{
 			FileInputStream inp = new FileInputStream(file);
-			while (-1!= (iQread=inp.read(ba)))
+			while (-1 != (iQread = inp.read(ba)))
 				if (iQread > 0)
-					out.write(ba,0,iQread);
+					out.write(ba, 0, iQread);
 			inp.close();
 			out.close();
 			return out.toByteArray();
 		}
-		catch(FileNotFoundException e) 	{throw new CourierException(e);}
-		catch (IOException e)			{throw new CourierException(e);}
+		catch (FileNotFoundException e)
+		{
+			throw new CourierException(e);
+		}
+		catch (IOException e)
+		{
+			throw new CourierException(e);
+		}
 
-		// Just in case... (comment next line out to see if you're missing something)
-		catch (Exception e)				{throw new CourierException(e);}
+		// Just in case... (comment next line out to see if you're missing
+		// something)
+		catch (Exception e)
+		{
+			throw new CourierException(e);
+		}
 	}
 
-	public File[] getFileList() throws CourierException 
+	public File[] getFileList() throws CourierException
 	{
 		try
 		{
 			File dir = new File(_epr.getURL().getFile());
-			if (! dir.isDirectory())
-				throw new CourierException("Can't get file list if URL is not a directory");
+			if (!dir.isDirectory())
+				throw new CourierException(
+						"Can't get file list if URL is not a directory");
 
 			FileFilter filter = new FileEndsWith(_epr.getInputSuffix());
 			return dir.listFiles(filter);
 		}
-		catch (URISyntaxException e)	{throw new CourierException(e);}
-		catch (MalformedURLException e)	{throw new CourierException(e);}
-		// Just in case... (comment next line out to see if you're missing something)
-		catch (Exception e)				{throw new CourierException(e);}
+		catch (URISyntaxException e)
+		{
+			throw new CourierException(e);
+		}
+		catch (MalformedURLException e)
+		{
+			throw new CourierException(e);
+		}
+		// Just in case... (comment next line out to see if you're missing
+		// something)
+		catch (Exception e)
+		{
+			throw new CourierException(e);
+		}
 	}
 
-	public boolean renameFile(File from, File to) throws CourierException 
+	public boolean renameFile(File from, File to) throws CourierException
 	{
 		try
-		{ 
+		{
 			to.delete();
 			if (!from.renameTo(to))
-				throw new CourierException("Unable to rename from "+from+" to "+to);
+				throw new CourierException("Unable to rename from " + from
+						+ " to " + to);
 			return true;
 		}
-		catch (Exception e)				{throw new CourierException(e);}
+		catch (Exception e)
+		{
+			throw new CourierException(e);
+		}
 	}
 
 	private class FileEndsWith implements FileFilter
-    {
-      String m_sSuffix;
-      FileEndsWith(String p_sEnd) throws CourierException
-      {
-        m_sSuffix = p_sEnd;
-        if (Util.isNullString(m_sSuffix))
-          throw new CourierException("A file suffix (or full Message id) must be specified for pickup");
-      } //______________________________
+	{
+		String m_sSuffix;
 
-      public boolean accept(File p_f)
-      {	return (p_f.isFile())
-        	? p_f.toString().endsWith(m_sSuffix)
-        	: false;
-      } //______________________________
-    } //____________________________________________________
-   
+		FileEndsWith(String p_sEnd) throws CourierException
+		{
+			m_sSuffix = p_sEnd;
+			if (Util.isNullString(m_sSuffix))
+				throw new CourierException(
+						"A file suffix (or full Message id) must be specified for pickup");
+		} // ______________________________
+
+		public boolean accept(File p_f)
+		{
+			return (p_f.isFile()) ? p_f.toString().endsWith(m_sSuffix) : false;
+		} // ______________________________
+	} // ____________________________________________________
+
 	protected FileEpr _epr;
+
 	static Logger _logger = Logger.getLogger(LocalFileHandler.class);
 }

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2006-12-21 11:58:11 UTC (rev 8487)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2006-12-21 12:07:36 UTC (rev 8488)
@@ -32,7 +32,8 @@
 
 import org.apache.log4j.Logger;
 
-public class JdbcCleanConn {
+public class JdbcCleanConn
+{
 	private DataSource m_oDS = null;
 
 	private Connection m_conn = null;
@@ -41,78 +42,107 @@
 
 	protected Logger m_oLogger;
 
-	public JdbcCleanConn(DataSource p_oDS) throws Exception {
+	public JdbcCleanConn(DataSource p_oDS) throws Exception
+	{
 		m_oDS = p_oDS;
 		m_oLogger = Logger.getLogger(this.getClass());
 	}
 
-	public void commit() throws Exception {
-		if (null != m_conn) {
+	public void commit() throws Exception
+	{
+		if (null != m_conn)
+		{
 			m_conn.commit();
-        }
+		}
 	}
 
-	public void rollback() throws Exception {
-		if (null != m_conn) {
+	public void rollback() throws Exception
+	{
+		if (null != m_conn)
+		{
 			m_conn.rollback();
-        }
+		}
 	}
-    
-	public void release() {
-		if (null != m_conn) {
-			try {
+
+	public void release()
+	{
+		if (null != m_conn)
+		{
+			try
+			{
 				m_conn.rollback();
-			} catch (Exception eRoll) {
 			}
+			catch (Exception eRoll)
+			{
+			}
 
 			for (PreparedStatement PS : m_olPrepSt)
-				try {
+				try
+				{
 					PS.close();
-				} catch (Exception e) {
 				}
-			try {
+				catch (Exception e)
+				{
+				}
+			try
+			{
 				m_conn.close();
-			} catch (Exception e1) {
 			}
+			catch (Exception e1)
+			{
+			}
 		}
 		m_olPrepSt.clear();
 		m_conn = null;
 	} // __________________________________
 
 	public PreparedStatement prepareStatement(String p_sSt, int p_i1, int p_i2)
-			throws Exception {
-        if (null == m_conn) {
-            connect();
-        }
+			throws Exception
+	{
+		if (null == m_conn)
+		{
+			connect();
+		}
 		PreparedStatement PS = m_conn.prepareStatement(p_sSt, p_i1, p_i2);
 		m_olPrepSt.add(PS);
 		return PS;
 	} // __________________________________
 
-	public PreparedStatement prepareStatement(String p_sSt) throws Exception {
-		if (null == m_conn) {
+	public PreparedStatement prepareStatement(String p_sSt) throws Exception
+	{
+		if (null == m_conn)
+		{
 			connect();
-        }
-        
+		}
+
 		PreparedStatement PS = m_conn.prepareStatement(p_sSt);
 		m_olPrepSt.add(PS);
 		return PS;
 	} // __________________________________
 
-	public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
-        if (null == m_conn) {
-            connect();
-        }
-        
+	public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry)
+			throws Exception
+	{
+		if (null == m_conn)
+		{
+			connect();
+		}
+
 		Exception eRet = null;
 		int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
-		for (int i1 = 0; i1 < iQtry; i1++) {
-			try {
+		for (int i1 = 0; i1 < iQtry; i1++)
+		{
+			try
+			{
 				return p_PS.executeQuery();
-			} catch (Exception e) {
+			}
+			catch (Exception e)
+			{
 				if (null == eRet)
 					eRet = new Exception(e.getMessage());
-				// System.out.println("Retrying "+i1);
+
+				// TODO magic number!!
+
 				Thread.sleep(100 + (new Double(100 * Math.random()))
 						.longValue());
 			}
@@ -121,18 +151,25 @@
 		throw eRet;
 	} // __________________________________
 
-	public void execUpdWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
-        if (null == m_conn) {
-            connect();
-        }
-        
+	public void execUpdWait(PreparedStatement p_PS, int p_iQtry)
+			throws Exception
+	{
+		if (null == m_conn)
+		{
+			connect();
+		}
+
 		Exception eRet = null;
 		int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
-		for (int i1 = 0; i1 < iQtry; i1++) {
-			try {
+		for (int i1 = 0; i1 < iQtry; i1++)
+		{
+			try
+			{
 				p_PS.executeUpdate();
 				return;
-			} catch (Exception e) {
+			}
+			catch (Exception e)
+			{
 				if (null == eRet)
 					eRet = e;
 				// System.out.println("Retrying "+i1);
@@ -144,29 +181,47 @@
 		throw eRet;
 	} // __________________________________
 
-	private void connect() throws Exception {
-        if(m_conn != null) {
-            return;
-        }
-        
+	private void connect() throws Exception
+	{
+		if (m_conn != null)
+		{
+			return;
+		}
+
 		Exception eRet = null;
-		for (int i1 = 0; i1 < 5; i1++) {
-			try {
+		for (int i1 = 0; i1 < 5; i1++)
+		{
+			try
+			{
 				m_conn = m_oDS.getConnection();
 				eRet = null;
 				break;
-			} catch (Exception e) {
+			}
+			catch (Exception e)
+			{
 				if (null == eRet)
 					eRet = e;
-				System.out.println("Connecting " + i1);
+
+				// TODO magic number!!
+
 				Thread.sleep(2000 + (new Double(100 * Math.random()))
 						.longValue());
 			}
 		}
-		if (null != eRet) {
+
+		if (eRet != null)
+		{
 			m_oLogger.error("connect() FAILED", eRet);
 			throw eRet;
 		}
+
+		if (m_conn == null)
+		{
+			m_oLogger.error("connect() FAILED: no connection");
+
+			throw new RuntimeException();
+		}
+
 		m_conn.setAutoCommit(false);
 		m_conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
 

Modified: labs/jbossesb/trunk/product/docs/ReleaseNotes.odt
===================================================================
(Binary files differ)




More information about the jboss-svn-commits mailing list