[jboss-svn-commits] JBL Code SVN: r22517 - in labs/jbossesb/workspace/skeagh: routing/jms/src/main/java/org/jboss/esb/jms and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Sep 8 11:03:56 EDT 2008


Author: beve
Date: 2008-09-08 11:03:55 -0400 (Mon, 08 Sep 2008)
New Revision: 22517

Added:
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java
Modified:
   labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java
   labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java
Log:
Added an adaptor for StreamMessage to InputStream.


Added: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java	2008-09-08 15:03:55 UTC (rev 22517)
@@ -0,0 +1,187 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2008, Red Hat Middleware
+ * LLC, and individual contributors by the @authors tag. See the copyright.txt
+ * in the distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.esb.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+/**
+ * Adapts/Wraps a javax.jms.StreamMessage to be compatible with a InputStream.
+ *
+ * @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
+ *
+ */
+public class StreamMessageInputStream extends InputStream
+{
+    /**
+     * StreamMessage being wrapped.
+     */
+    private final StreamMessage streamMessage;
+
+    /**
+     * Sole constructor that takes the StreamMessage that is to be wrapped.
+     * @param message       the StreamMessage to adapt/wrap
+     */
+    public StreamMessageInputStream(final StreamMessage message)
+    {
+        this.streamMessage = message;
+    }
+
+    /**
+     * No operation.
+     *
+     * @throws IOException  - cannot be thrown as this impl does nothing.
+     */
+    @Override
+    public final void close() throws IOException
+    {
+        //  NoOp
+    }
+
+    /**
+     * Will try to read the number of bytes into the passed in
+     * byte[].
+     * @param b             the byte[] into which data should be read.
+     * @return int          the number of bytes read of -1 in there is nothing more to read.
+     * @throws IOException  if an exception occurs while reading.
+     */
+    @Override
+    public final int read(final byte[] b) throws IOException
+    {
+        try
+        {
+            return streamMessage.readBytes(b);
+        }
+        catch (final JMSException e)
+        {
+            return -1;
+        }
+    }
+
+    /**
+     * Will try to call reset on the underlying StreamMessage.
+     *
+     * @throws IOException if an exception occurs while trying to reset.
+     * @see java.io.InputStream#markSupported()
+     */
+    @Override
+    public final synchronized void reset() throws IOException
+    {
+        try
+        {
+            streamMessage.reset();
+        }
+        catch (final JMSException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
+
+    /**
+     * Will always return false for this implementation.
+     *
+     * @return false    this method will always return false.
+     */
+    @Override
+    public final boolean markSupported()
+    {
+        return false;
+    }
+
+    /**
+     * Not supported by this implementation. UnsupportedOperationException will be thown.
+     *
+     * @return int         Cannot be returned.
+     * @throws IOException Cannot be thrown.
+     */
+    @Override
+    public final int read() throws IOException
+    {
+        throw new UnsupportedOperationException("read() is not supported bye this implemenation.");
+    }
+
+    /**
+     * Not supported by this implementation. UnsupportedOperationException will be thown.
+     *
+     * @return int          Cannot be returned
+     * @throws IOException  Cannot be thrown.
+     *
+     * @see InputStream#available()
+     */
+    @Override
+    public final int available() throws IOException
+    {
+        throw new UnsupportedOperationException("available() is not supported bye this implemenation.");
+    }
+
+    /**
+     * Not supported by this implementation. UnsupportedOperationException will be thown.
+     *
+     * @param b            ignored.
+     * @param off          ignored.
+     * @param len          ignored.
+     * @return int         Cannot be returned.
+     * @throws IOException Cannot be thrown.
+     */
+    @Override
+    public final int read(final byte[] b, final int off, final int len) throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Not supported by this implementation. UnsupportedOperationException will be thown.
+     *
+     * @param n             ignored
+     * @return long         Cannot be returned.
+     * @throws IOException  Cannot be thrown.
+     */
+    @Override
+    public final long skip(final long n) throws IOException
+    {
+        throw new UnsupportedOperationException("skip method not supported by this implementation");
+    }
+
+    /**
+     * Not supported by this implementation. UnsupportedOperationException will be thown.
+     *
+     * @param readlimit     ignored
+     */
+    @Override
+    public final synchronized void mark(final int readlimit)
+    {
+        throw new UnsupportedOperationException("mark(int)");
+    }
+
+    /**
+     * Returns the underlying StreamMessage.
+     *
+     * @return StreamMessage    the StreamMessage being wrapped.
+     */
+    public final StreamMessage getStreamMessage()
+    {
+        return streamMessage;
+    }
+
+}

Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java	2008-09-08 14:21:34 UTC (rev 22516)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java	2008-09-08 15:03:55 UTC (rev 22517)
@@ -44,11 +44,6 @@
 public final class JmsPayloadExtractor
 {
     /**
-     * Logger instance.
-     */
-    private static final Logger LOGGER = Logger.getLogger(JmsPayloadExtractor.class);
-
-    /**
      * Private constructor.
      */
     private JmsPayloadExtractor()
@@ -57,7 +52,16 @@
     }
 
     /**
-     * Add docs.
+     * Extracts the content from the passed in JMSMessage object.
+     * <br>
+     * The following conversions are done bases on the message type:
+     * <lu>
+     *  <li>TestMessage     ->      java.lang.String</li>
+     *  <li>ObjectMessage   ->      java.lang.Object</li>
+     *  <li>BytesMessage    ->      byte[]</li>
+     *  <li>MapMessage      ->      java.util.Map</li>
+     *  <li>StreamMessage   ->      org.jboss.esb.jms.StreamMessageInputStream</li>
+     * </lu>
      *
      * @param jmsMessage        the JMS Message object instance
      * @return Message          the populated ESB Message object instance.
@@ -88,61 +92,31 @@
             }
             else if (jmsMessage instanceof BytesMessage)
             {
-                BytesMessage bytesMessage = (BytesMessage)jmsMessage;
-                bytesMessage.reset();
-                int msgLength = 0;
-                final int buffSize = 50000;
-                byte[] data = new byte[buffSize];
-                //  calculate the nr of bytes in the message
-                while (true)
-                {
-                    int len = bytesMessage.readBytes(data);
-                    if (len > 0)
-                    {
-                        msgLength += len;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                }
-                if (msgLength == 0)
-                {
-                    throw new RoutingException("Content in JMSMessage [" + jmsMessage + "] was of zero length");
-                }
-                byte[] content = new byte[msgLength];
-                if (msgLength <= buffSize)
-                {
-                    System.arraycopy(data, 0, content, 0, msgLength);
-                }
-                else
-                {
-                   bytesMessage.reset();
-                   bytesMessage.readBytes(content);
-                }
+                byte[] content = readBytes(jmsMessage);
                 esbMessage.setPayload(content);
             }
             else if (jmsMessage instanceof MapMessage)
             {
-                final MapMessage map = (MapMessage)jmsMessage;
-                final Map newMap = new HashMap();
+                final MapMessage jmsMap = (MapMessage)jmsMessage;
+                final Map<String,Object> esbMap = new HashMap<String,Object>();
 
-                Enumeration mapNames = map.getMapNames();
+                final Enumeration<?> mapNames = jmsMap.getMapNames();
                 while (mapNames.hasMoreElements())
                 {
                     final String key = (String)mapNames.nextElement();
-                    final Object value = map.getObject(key);
-                    newMap.put(key, value);
+                    final Object value = jmsMap.getObject(key);
+                    esbMap.put(key, value);
                 }
-                if (newMap.isEmpty())
+                if (esbMap.isEmpty())
                 {
                     throw new RoutingException("Map in JMSMessage [" + jmsMessage + "] contained zero key value pairs.");
                 }
-                esbMessage.setPayload(newMap);
+                esbMessage.setPayload(esbMap);
             }
             else if (jmsMessage instanceof StreamMessage)
             {
-                LOGGER.info("StreamMessage");
+                StreamMessage streamMessage = (StreamMessage)jmsMessage;
+                esbMessage.setPayload(new StreamMessageInputStream(streamMessage));
             }
             else
             {
@@ -156,4 +130,41 @@
         return esbMessage;
     }
 
+    private static byte[] readBytes(final javax.jms.Message jmsMessage) throws JMSException, RoutingException
+    {
+        final BytesMessage bytesMessage = (BytesMessage)jmsMessage;
+        bytesMessage.reset();
+        int msgLength = 0;
+        final int buffSize = 50000;
+        byte[] data = new byte[buffSize];
+        //  calculate the nr of bytes in the message
+        while (true)
+        {
+            int len = bytesMessage.readBytes(data);
+            if (len > 0)
+            {
+                msgLength += len;
+            }
+            else
+            {
+                break;
+            }
+        }
+        if (msgLength == 0)
+        {
+            throw new RoutingException("Content in JMSMessage [" + jmsMessage + "] was of zero length");
+        }
+        byte[] content = new byte[msgLength];
+        if (msgLength <= buffSize)
+        {
+            System.arraycopy(data, 0, content, 0, msgLength);
+        }
+        else
+        {
+           bytesMessage.reset();
+           bytesMessage.readBytes(content);
+        }
+        return content;
+    }
+
 }

Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java	2008-09-08 14:21:34 UTC (rev 22516)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java	2008-09-08 15:03:55 UTC (rev 22517)
@@ -23,6 +23,9 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
 
 import javax.jms.JMSException;
@@ -33,8 +36,11 @@
 import com.mockrunner.mock.jms.MockBytesMessage;
 import com.mockrunner.mock.jms.MockMapMessage;
 import com.mockrunner.mock.jms.MockObjectMessage;
+import com.mockrunner.mock.jms.MockStreamMessage;
 import com.mockrunner.mock.jms.MockTextMessage;
 
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
 /**
  * Test for {@link JmsPayloadExtractor}.
  *
@@ -122,4 +128,51 @@
         JmsPayloadExtractor.extractPayload(mapMessage);
     }
 
+    @Test
+    public void extractContentStreamMessageStreamAdapter() throws RoutingException, JMSException, ClassNotFoundException, IOException
+    {
+        final byte[] content = "testing text message content".getBytes();
+        final MockStreamMessage streamMessage = new MockStreamMessage();
+        streamMessage.writeObject(content);
+
+        final Message esbMessage = JmsPayloadExtractor.extractPayload(streamMessage);
+
+        Object payload = esbMessage.getPayload();
+        assertTrue(payload instanceof InputStream);
+        StreamMessageInputStream adapter = (StreamMessageInputStream)payload;
+        adapter.reset();
+        Object readObject = adapter.getStreamMessage().readObject();
+        assertTrue( readObject instanceof byte[]);
+        assertEquals( new String(content), new String((byte[])readObject));
+    }
+
+    @Test
+    public void extractContentStreamMessageInputStream() throws RoutingException, JMSException, ClassNotFoundException, IOException
+    {
+        final byte[] content = "testing text message content".getBytes();
+        final MockStreamMessage streamMessage = new MockStreamMessage();
+        streamMessage.writeObject(content);
+
+        long start = System.nanoTime();
+        final Message esbMessage = JmsPayloadExtractor.extractPayload(streamMessage);
+        long end = System.nanoTime();
+        long millis = TimeUnit.MILLISECONDS.toMillis(end - start);
+        System.out.println(millis);
+
+        assertTrue(esbMessage.getPayload() instanceof InputStream);
+        InputStream in = (InputStream)esbMessage.getPayload();
+        in.reset();
+
+        //  read from InputStream
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        final byte[] buf = new byte[1024];
+        int len;
+        while ((len = in.read(buf)) > 0)
+        {
+            bos.write(buf, 0, len);
+        }
+        byte[] data = bos.toByteArray();
+        assertEquals(new String(content), new String(data));
+    }
+
 }




More information about the jboss-svn-commits mailing list