[jboss-svn-commits] JBL Code SVN: r22517 - in labs/jbossesb/workspace/skeagh: routing/jms/src/main/java/org/jboss/esb/jms and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Sep 8 11:03:56 EDT 2008
Author: beve
Date: 2008-09-08 11:03:55 -0400 (Mon, 08 Sep 2008)
New Revision: 22517
Added:
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java
Modified:
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java
labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java
Log:
Added an adaptor for StreamMessage to InputStream.
Added: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/StreamMessageInputStream.java 2008-09-08 15:03:55 UTC (rev 22517)
@@ -0,0 +1,187 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2008, Red Hat Middleware
+ * LLC, and individual contributors by the @authors tag. See the copyright.txt
+ * in the distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.esb.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+/**
+ * Adapts/Wraps a javax.jms.StreamMessage to be compatible with a InputStream.
+ *
+ * @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
+ *
+ */
+public class StreamMessageInputStream extends InputStream
+{
+ /**
+ * StreamMessage being wrapped.
+ */
+ private final StreamMessage streamMessage;
+
+ /**
+ * Sole constructor that takes the StreamMessage that is to be wrapped.
+ * @param message the StreamMessage to adapt/wrap
+ */
+ public StreamMessageInputStream(final StreamMessage message)
+ {
+ this.streamMessage = message;
+ }
+
+ /**
+ * No operation.
+ *
+ * @throws IOException - cannot be thrown as this impl does nothing.
+ */
+ @Override
+ public final void close() throws IOException
+ {
+ // NoOp
+ }
+
+ /**
+ * Will try to read the number of bytes into the passed in
+ * byte[].
+ * @param b the byte[] into which data should be read.
+ * @return int the number of bytes read of -1 in there is nothing more to read.
+ * @throws IOException if an exception occurs while reading.
+ */
+ @Override
+ public final int read(final byte[] b) throws IOException
+ {
+ try
+ {
+ return streamMessage.readBytes(b);
+ }
+ catch (final JMSException e)
+ {
+ return -1;
+ }
+ }
+
+ /**
+ * Will try to call reset on the underlying StreamMessage.
+ *
+ * @throws IOException if an exception occurs while trying to reset.
+ * @see java.io.InputStream#markSupported()
+ */
+ @Override
+ public final synchronized void reset() throws IOException
+ {
+ try
+ {
+ streamMessage.reset();
+ }
+ catch (final JMSException e)
+ {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Will always return false for this implementation.
+ *
+ * @return false this method will always return false.
+ */
+ @Override
+ public final boolean markSupported()
+ {
+ return false;
+ }
+
+ /**
+ * Not supported by this implementation. UnsupportedOperationException will be thown.
+ *
+ * @return int Cannot be returned.
+ * @throws IOException Cannot be thrown.
+ */
+ @Override
+ public final int read() throws IOException
+ {
+ throw new UnsupportedOperationException("read() is not supported bye this implemenation.");
+ }
+
+ /**
+ * Not supported by this implementation. UnsupportedOperationException will be thown.
+ *
+ * @return int Cannot be returned
+ * @throws IOException Cannot be thrown.
+ *
+ * @see InputStream#available()
+ */
+ @Override
+ public final int available() throws IOException
+ {
+ throw new UnsupportedOperationException("available() is not supported bye this implemenation.");
+ }
+
+ /**
+ * Not supported by this implementation. UnsupportedOperationException will be thown.
+ *
+ * @param b ignored.
+ * @param off ignored.
+ * @param len ignored.
+ * @return int Cannot be returned.
+ * @throws IOException Cannot be thrown.
+ */
+ @Override
+ public final int read(final byte[] b, final int off, final int len) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Not supported by this implementation. UnsupportedOperationException will be thown.
+ *
+ * @param n ignored
+ * @return long Cannot be returned.
+ * @throws IOException Cannot be thrown.
+ */
+ @Override
+ public final long skip(final long n) throws IOException
+ {
+ throw new UnsupportedOperationException("skip method not supported by this implementation");
+ }
+
+ /**
+ * Not supported by this implementation. UnsupportedOperationException will be thown.
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public final synchronized void mark(final int readlimit)
+ {
+ throw new UnsupportedOperationException("mark(int)");
+ }
+
+ /**
+ * Returns the underlying StreamMessage.
+ *
+ * @return StreamMessage the StreamMessage being wrapped.
+ */
+ public final StreamMessage getStreamMessage()
+ {
+ return streamMessage;
+ }
+
+}
Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java 2008-09-08 14:21:34 UTC (rev 22516)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsPayloadExtractor.java 2008-09-08 15:03:55 UTC (rev 22517)
@@ -44,11 +44,6 @@
public final class JmsPayloadExtractor
{
/**
- * Logger instance.
- */
- private static final Logger LOGGER = Logger.getLogger(JmsPayloadExtractor.class);
-
- /**
* Private constructor.
*/
private JmsPayloadExtractor()
@@ -57,7 +52,16 @@
}
/**
- * Add docs.
+ * Extracts the content from the passed in JMSMessage object.
+ * <br>
+ * The following conversions are done bases on the message type:
+ * <lu>
+ * <li>TestMessage -> java.lang.String</li>
+ * <li>ObjectMessage -> java.lang.Object</li>
+ * <li>BytesMessage -> byte[]</li>
+ * <li>MapMessage -> java.util.Map</li>
+ * <li>StreamMessage -> org.jboss.esb.jms.StreamMessageInputStream</li>
+ * </lu>
*
* @param jmsMessage the JMS Message object instance
* @return Message the populated ESB Message object instance.
@@ -88,61 +92,31 @@
}
else if (jmsMessage instanceof BytesMessage)
{
- BytesMessage bytesMessage = (BytesMessage)jmsMessage;
- bytesMessage.reset();
- int msgLength = 0;
- final int buffSize = 50000;
- byte[] data = new byte[buffSize];
- // calculate the nr of bytes in the message
- while (true)
- {
- int len = bytesMessage.readBytes(data);
- if (len > 0)
- {
- msgLength += len;
- }
- else
- {
- break;
- }
- }
- if (msgLength == 0)
- {
- throw new RoutingException("Content in JMSMessage [" + jmsMessage + "] was of zero length");
- }
- byte[] content = new byte[msgLength];
- if (msgLength <= buffSize)
- {
- System.arraycopy(data, 0, content, 0, msgLength);
- }
- else
- {
- bytesMessage.reset();
- bytesMessage.readBytes(content);
- }
+ byte[] content = readBytes(jmsMessage);
esbMessage.setPayload(content);
}
else if (jmsMessage instanceof MapMessage)
{
- final MapMessage map = (MapMessage)jmsMessage;
- final Map newMap = new HashMap();
+ final MapMessage jmsMap = (MapMessage)jmsMessage;
+ final Map<String,Object> esbMap = new HashMap<String,Object>();
- Enumeration mapNames = map.getMapNames();
+ final Enumeration<?> mapNames = jmsMap.getMapNames();
while (mapNames.hasMoreElements())
{
final String key = (String)mapNames.nextElement();
- final Object value = map.getObject(key);
- newMap.put(key, value);
+ final Object value = jmsMap.getObject(key);
+ esbMap.put(key, value);
}
- if (newMap.isEmpty())
+ if (esbMap.isEmpty())
{
throw new RoutingException("Map in JMSMessage [" + jmsMessage + "] contained zero key value pairs.");
}
- esbMessage.setPayload(newMap);
+ esbMessage.setPayload(esbMap);
}
else if (jmsMessage instanceof StreamMessage)
{
- LOGGER.info("StreamMessage");
+ StreamMessage streamMessage = (StreamMessage)jmsMessage;
+ esbMessage.setPayload(new StreamMessageInputStream(streamMessage));
}
else
{
@@ -156,4 +130,41 @@
return esbMessage;
}
+ private static byte[] readBytes(final javax.jms.Message jmsMessage) throws JMSException, RoutingException
+ {
+ final BytesMessage bytesMessage = (BytesMessage)jmsMessage;
+ bytesMessage.reset();
+ int msgLength = 0;
+ final int buffSize = 50000;
+ byte[] data = new byte[buffSize];
+ // calculate the nr of bytes in the message
+ while (true)
+ {
+ int len = bytesMessage.readBytes(data);
+ if (len > 0)
+ {
+ msgLength += len;
+ }
+ else
+ {
+ break;
+ }
+ }
+ if (msgLength == 0)
+ {
+ throw new RoutingException("Content in JMSMessage [" + jmsMessage + "] was of zero length");
+ }
+ byte[] content = new byte[msgLength];
+ if (msgLength <= buffSize)
+ {
+ System.arraycopy(data, 0, content, 0, msgLength);
+ }
+ else
+ {
+ bytesMessage.reset();
+ bytesMessage.readBytes(content);
+ }
+ return content;
+ }
+
}
Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java 2008-09-08 14:21:34 UTC (rev 22516)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsPayloadExtractorTest.java 2008-09-08 15:03:55 UTC (rev 22517)
@@ -23,6 +23,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Map;
import javax.jms.JMSException;
@@ -33,8 +36,11 @@
import com.mockrunner.mock.jms.MockBytesMessage;
import com.mockrunner.mock.jms.MockMapMessage;
import com.mockrunner.mock.jms.MockObjectMessage;
+import com.mockrunner.mock.jms.MockStreamMessage;
import com.mockrunner.mock.jms.MockTextMessage;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
/**
* Test for {@link JmsPayloadExtractor}.
*
@@ -122,4 +128,51 @@
JmsPayloadExtractor.extractPayload(mapMessage);
}
+ @Test
+ public void extractContentStreamMessageStreamAdapter() throws RoutingException, JMSException, ClassNotFoundException, IOException
+ {
+ final byte[] content = "testing text message content".getBytes();
+ final MockStreamMessage streamMessage = new MockStreamMessage();
+ streamMessage.writeObject(content);
+
+ final Message esbMessage = JmsPayloadExtractor.extractPayload(streamMessage);
+
+ Object payload = esbMessage.getPayload();
+ assertTrue(payload instanceof InputStream);
+ StreamMessageInputStream adapter = (StreamMessageInputStream)payload;
+ adapter.reset();
+ Object readObject = adapter.getStreamMessage().readObject();
+ assertTrue( readObject instanceof byte[]);
+ assertEquals( new String(content), new String((byte[])readObject));
+ }
+
+ @Test
+ public void extractContentStreamMessageInputStream() throws RoutingException, JMSException, ClassNotFoundException, IOException
+ {
+ final byte[] content = "testing text message content".getBytes();
+ final MockStreamMessage streamMessage = new MockStreamMessage();
+ streamMessage.writeObject(content);
+
+ long start = System.nanoTime();
+ final Message esbMessage = JmsPayloadExtractor.extractPayload(streamMessage);
+ long end = System.nanoTime();
+ long millis = TimeUnit.MILLISECONDS.toMillis(end - start);
+ System.out.println(millis);
+
+ assertTrue(esbMessage.getPayload() instanceof InputStream);
+ InputStream in = (InputStream)esbMessage.getPayload();
+ in.reset();
+
+ // read from InputStream
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) > 0)
+ {
+ bos.write(buf, 0, len);
+ }
+ byte[] data = bos.toByteArray();
+ assertEquals(new String(content), new String(data));
+ }
+
}
More information about the jboss-svn-commits
mailing list