[jboss-svn-commits] JBL Code SVN: r21044 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Jul 15 14:10:28 EDT 2008
Author: kevin.conner at jboss.com
Date: 2008-07-15 14:10:27 -0400 (Tue, 15 Jul 2008)
New Revision: 21044
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
Log:
Added support for binary/character types: JBESB-1805
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-07-15 17:41:27 UTC (rev 21043)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-07-15 18:10:27 UTC (rev 21044)
@@ -29,13 +29,15 @@
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
import org.jboss.soa.esb.common.TransactionStrategy;
import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.*;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.couriers.CourierTransportException;
+import org.jboss.soa.esb.couriers.FaultMessageException;
import org.jboss.soa.esb.listeners.message.errors.Factory;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.util.Type;
import org.jboss.soa.esb.util.Util;
-import java.io.Reader;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
@@ -49,6 +51,9 @@
import java.sql.Types;
import java.util.UUID;
+import javax.sql.rowset.serial.SerialBlob;
+import javax.sql.rowset.serial.SerialClob;
+
public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
{
protected long _pollLatency = 200;
@@ -57,6 +62,8 @@
protected boolean deleteOnSuccess, deleteOnError;
protected boolean _isReceiver;
+
+ private int messageType = Types.OTHER ;
private JDBCEprDBResourceFactory jdbcFactory;
@@ -95,7 +102,7 @@
throw new CourierException(e);
}
- jdbcFactory = new JDBCEprDBResourceFactory(epr);
+ jdbcFactory = new JDBCEprDBResourceFactory(epr);
}
public void cleanup() {
@@ -138,9 +145,9 @@
boolean transactional = isTransactional();
- Serializable serilaizedMessage;
+ Serializable serializedMessage;
try {
- serilaizedMessage = Util.serialize(message);
+ serializedMessage = Util.serialize(message);
} catch (Exception e) {
throw new CourierTransportException("Unable to serialize ESB Message.", e);
}
@@ -151,7 +158,32 @@
PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
try {
insertStatement.setString(1, msgId);
- insertStatement.setObject(2, serilaizedMessage);
+
+ final int type = getMessageType(connection) ;
+ switch(type)
+ {
+ case Types.BLOB:
+ final Blob blob = new SerialBlob(serializedMessage.toString().getBytes()) ;
+ insertStatement.setBlob(2, blob) ;
+ break ;
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ final byte[] data = serializedMessage.toString().getBytes() ;
+ insertStatement.setBytes(2, data) ;
+ break ;
+ case Types.CLOB:
+ final Clob clob = new SerialClob(serializedMessage.toString().toCharArray()) ;
+ insertStatement.setClob(2, clob) ;
+ break ;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ insertStatement.setString(2, serializedMessage.toString()) ;
+ break ;
+ default:
+ insertStatement.setObject(2, serializedMessage);
+ }
insertStatement.setString(3, State.Pending.getColumnValue());
insertStatement.setLong(4, System.currentTimeMillis());
@@ -290,8 +322,6 @@
selectUpdateStatement.setString(2, State.Pending.getColumnValue());
ResultSet resultSet = selectUpdateStatement.executeQuery();
- final ResultSetMetaData metaData = resultSet.getMetaData() ;
- final int type = metaData.getColumnType(1) ;
try
{
if (resultSet.next())
@@ -301,19 +331,32 @@
try
{
final Serializable value ;
- if (type == Types.BLOB)
+ final int type = getMessageType(resultSet) ;
+ switch (type)
{
+ case Types.BLOB:
final Blob blob = resultSet.getBlob(1) ;
- value = StreamUtils.readStreamString(blob.getBinaryStream(), "UTF-8");
- }
- else if (type == Types.CLOB)
- {
+ final byte[] blobData = StreamUtils.readStream(blob.getBinaryStream()) ;
+ value = new String(blobData) ;
+ break ;
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ final byte[] binaryData = StreamUtils.readStream(resultSet.getBinaryStream(1)) ;
+ value = new String(binaryData) ;
+ break ;
+ case Types.CLOB:
final Clob clob = resultSet.getClob(1) ;
value = StreamUtils.readReader(clob.getCharacterStream());
- }
- else
- {
+ break ;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ value = resultSet.getString(1) ;
+ break ;
+ default:
value = (Serializable) resultSet.getObject(1);
+ break ;
}
result = Util.deserialize(value);
}
@@ -425,4 +468,53 @@
}
return transactional;
}
-}
\ No newline at end of file
+
+ private synchronized int getMessageType(final Connection connection)
+ throws SQLException
+ {
+ if (messageType != Types.OTHER)
+ {
+ return messageType ;
+ }
+
+ final PreparedStatement ps = jdbcFactory.createSelect4UpdateStatement(connection) ;
+ try
+ {
+ ps.setString(1, "");
+ ps.setString(2, State.Pending.getColumnValue());
+
+ final ResultSet resultSet = ps.executeQuery();
+ try
+ {
+ return getMessageType(resultSet) ;
+ }
+ finally
+ {
+ try
+ {
+ resultSet.close() ;
+ }
+ catch (final Throwable th) {} // ignore
+ }
+ }
+ finally
+ {
+ try
+ {
+ ps.close() ;
+ }
+ catch (final Throwable th) {} // ignore
+ }
+ }
+
+ private synchronized int getMessageType(final ResultSet resultSet)
+ throws SQLException
+ {
+ if (messageType == Types.OTHER)
+ {
+ final ResultSetMetaData metaData = resultSet.getMetaData() ;
+ messageType = metaData.getColumnType(1) ;
+ }
+ return messageType ;
+ }
+}
More information about the jboss-svn-commits
mailing list