[jboss-svn-commits] JBL Code SVN: r21044 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Jul 15 14:10:28 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-07-15 14:10:27 -0400 (Tue, 15 Jul 2008)
New Revision: 21044

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
Log:
Added support for binary/character types: JBESB-1805

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-07-15 17:41:27 UTC (rev 21043)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-07-15 18:10:27 UTC (rev 21044)
@@ -29,13 +29,15 @@
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.common.TransactionStrategy;
 import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.*;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.couriers.CourierTransportException;
+import org.jboss.soa.esb.couriers.FaultMessageException;
 import org.jboss.soa.esb.listeners.message.errors.Factory;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.util.Type;
 import org.jboss.soa.esb.util.Util;
 
-import java.io.Reader;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -49,6 +51,9 @@
 import java.sql.Types;
 import java.util.UUID;
 
+import javax.sql.rowset.serial.SerialBlob;
+import javax.sql.rowset.serial.SerialClob;
+
 public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
 {
     protected long _pollLatency = 200;
@@ -57,6 +62,8 @@
 
     protected boolean deleteOnSuccess, deleteOnError;
 	protected boolean _isReceiver;
+	
+	private int messageType = Types.OTHER ;
 
     private JDBCEprDBResourceFactory jdbcFactory;
 
@@ -95,7 +102,7 @@
 			throw new CourierException(e);
 		}
 
-        jdbcFactory = new JDBCEprDBResourceFactory(epr);
+	        jdbcFactory = new JDBCEprDBResourceFactory(epr);
 	}
 
 	public void cleanup() {
@@ -138,9 +145,9 @@
 
         boolean transactional = isTransactional();
 
-        Serializable serilaizedMessage;
+        Serializable serializedMessage;
         try {
-            serilaizedMessage = Util.serialize(message);
+            serializedMessage = Util.serialize(message);
         } catch (Exception e) {
             throw new CourierTransportException("Unable to serialize ESB Message.", e);
         }
@@ -151,7 +158,32 @@
             PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
             try {
                 insertStatement.setString(1, msgId);
-                insertStatement.setObject(2, serilaizedMessage);
+                
+                final int type = getMessageType(connection) ;
+                switch(type)
+                {
+                case Types.BLOB:
+                    final Blob blob = new SerialBlob(serializedMessage.toString().getBytes()) ;
+                    insertStatement.setBlob(2, blob) ;
+                    break ;
+                case Types.BINARY:
+                case Types.VARBINARY:
+                case Types.LONGVARBINARY:
+                    final byte[] data = serializedMessage.toString().getBytes() ;
+                    insertStatement.setBytes(2, data) ;
+                    break ;
+                case Types.CLOB:
+                    final Clob clob = new SerialClob(serializedMessage.toString().toCharArray()) ;
+                    insertStatement.setClob(2, clob) ;
+                    break ;
+                case Types.CHAR:
+                case Types.VARCHAR:
+                case Types.LONGVARCHAR:
+                    insertStatement.setString(2, serializedMessage.toString()) ;
+                    break ;
+                default:
+                    insertStatement.setObject(2, serializedMessage);
+                }
                 insertStatement.setString(3, State.Pending.getColumnValue());
                 insertStatement.setLong(4, System.currentTimeMillis());
 
@@ -290,8 +322,6 @@
             selectUpdateStatement.setString(2, State.Pending.getColumnValue());
 
             ResultSet resultSet = selectUpdateStatement.executeQuery();
-            final ResultSetMetaData metaData = resultSet.getMetaData() ;
-            final int type = metaData.getColumnType(1) ;
             try
             {
                 if (resultSet.next())
@@ -301,19 +331,32 @@
                     try
                     {
                         final Serializable value ;
-                        if (type == Types.BLOB)
+                        final int type = getMessageType(resultSet) ;
+                        switch (type)
                         {
+                        case Types.BLOB:
                             final Blob blob = resultSet.getBlob(1) ;
-                            value = StreamUtils.readStreamString(blob.getBinaryStream(), "UTF-8");
-                        }
-                        else if (type == Types.CLOB)
-                        {
+                            final byte[] blobData = StreamUtils.readStream(blob.getBinaryStream()) ;
+                            value = new String(blobData) ;
+                            break ;
+                        case Types.BINARY:
+                        case Types.VARBINARY:
+                        case Types.LONGVARBINARY:
+                            final byte[] binaryData = StreamUtils.readStream(resultSet.getBinaryStream(1)) ;
+                            value = new String(binaryData) ;
+                            break ;
+                        case Types.CLOB:
                             final Clob clob = resultSet.getClob(1) ;
                             value = StreamUtils.readReader(clob.getCharacterStream());
-                        }
-                        else
-                        {
+                            break ;
+                        case Types.CHAR:
+                        case Types.VARCHAR:
+                        case Types.LONGVARCHAR:
+                            value = resultSet.getString(1) ;
+                            break ;
+                        default:
                             value = (Serializable) resultSet.getObject(1);
+                            break ;
                         }
                         result = Util.deserialize(value);
                     }
@@ -425,4 +468,53 @@
         }
         return transactional;
     }
-}
\ No newline at end of file
+    
+    private synchronized int getMessageType(final Connection connection)
+        throws SQLException
+    {
+        if (messageType != Types.OTHER)
+        {
+            return messageType ;
+        }
+        
+        final PreparedStatement ps = jdbcFactory.createSelect4UpdateStatement(connection) ;
+        try
+        {
+                ps.setString(1, "");
+                ps.setString(2, State.Pending.getColumnValue());
+
+                final ResultSet resultSet = ps.executeQuery();
+                try
+                {
+                    return getMessageType(resultSet) ;
+                }
+                finally
+                {
+                    try
+                    {
+                        resultSet.close() ;
+                    }
+                    catch (final Throwable th) {} // ignore
+                }
+        }
+        finally
+        {
+            try
+            {
+                ps.close() ;
+            }
+            catch (final Throwable th) {} // ignore
+        }
+    }
+    
+    private synchronized int getMessageType(final ResultSet resultSet)
+        throws SQLException
+    {
+        if (messageType == Types.OTHER)
+        {
+            final ResultSetMetaData metaData = resultSet.getMetaData() ;
+            messageType = metaData.getColumnType(1) ;
+        }
+        return messageType ;
+    }
+}




More information about the jboss-svn-commits mailing list