[jboss-cvs] JBoss Messaging SVN: r6483 - in trunk: examples/jms/large-message and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Apr 17 13:50:33 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-17 13:50:33 -0400 (Fri, 17 Apr 2009)
New Revision: 6483
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java
Modified:
trunk/build.xml
trunk/examples/jms/large-message/readme.html
trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java
Log:
tweaks on LargeMessage and adding dev-tests target on build.xml
Modified: trunk/build.xml
===================================================================
--- trunk/build.xml 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/build.xml 2009-04-17 17:50:33 UTC (rev 6483)
@@ -241,6 +241,11 @@
<ant antfile="build-messaging.xml" target="hudson-tests"/>
</target>
+ <target name="dev-tests" depends="createthirdparty">
+ <ant antfile="build-messaging.xml" target="hudson-tests"/>
+ <ant antfile="build-messaging.xml" target="compile-reports"/>
+ </target>
+
<target name="tests" depends="createthirdparty">
<ant antfile="build-messaging.xml" target="tests"/>
<ant antfile="build-messaging.xml" target="compile-reports"/>
Modified: trunk/examples/jms/large-message/readme.html
===================================================================
--- trunk/examples/jms/large-message/readme.html 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/examples/jms/large-message/readme.html 2009-04-17 17:50:33 UTC (rev 6483)
@@ -77,7 +77,7 @@
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
- ((JBossMessage)message).setInputStream(bufferedInput);
+ message.setObjectProperty("JMS_JBM_InputStream", bufferedInput);
</code></pre>
<li>Send the Message.</li>
@@ -149,15 +149,13 @@
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
- ((JBossMessage)messageReceived).setOutputStream(bufferedOutput);
</code></pre>
- <li>We wait until the entire message is written before continuing.</li>
+ <li>This will save the stream and wait until the entire message is written before continuing.</li>
<pre><code>
- ((JBossMessage)messageReceived).waitCompletionOnStream(300000);
+ messageReceived.setObjectProperty("JMS_JBM_SaveStream", bufferedOutput);
</code></pre>
-
<li>Be sure to close our resources!</li>
<pre>
Modified: trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java 2009-04-17 17:50:33 UTC (rev 6483)
@@ -38,8 +38,6 @@
import javax.jms.Session;
import javax.naming.InitialContext;
-import org.jboss.messaging.jms.client.JBossMessage;
-
/**
* This example demonstrates the ability of JBoss Messaging to send and consume a very large message, much
* bigger than can fit in RAM.
@@ -111,7 +109,8 @@
// file, however we could use any InputStream not just a FileInputStream.
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
- ((JBossMessage)message).setInputStream(bufferedInput);
+
+ message.setObjectProperty("JMS_JBM_InputStream", bufferedInput);
System.out.println("Sending the huge message.");
@@ -178,11 +177,9 @@
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
- ((JBossMessage)messageReceived).setOutputStream(bufferedOutput);
+ // Step 14. This will save the stream and wait until the entire message is written before continuing.
+ messageReceived.setObjectProperty("JMS_JBM_SaveStream", bufferedOutput);
- // Step 14. We wait until the entire message is written before continuing.
- ((JBossMessage)messageReceived).waitCompletionOnStream(300000);
-
fileOutputStream.close();
System.out.println("File streamed to disk. Size of received file on disk is " + outputFile.length());
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-04-17 17:50:33 UTC (rev 6483)
@@ -45,6 +45,7 @@
* A StorageManager
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-04-17 17:50:33 UTC (rev 6483)
@@ -270,7 +270,7 @@
}
}
- // TODO: Optimize this per https://jira.jboss.org/jira/browse/JBMESSAGING-1468
+ // TODO: Optimise this per https://jira.jboss.org/jira/browse/JBMESSAGING-1496
@Override
public synchronized ServerMessage copy(final long newID) throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2009-04-17 17:50:33 UTC (rev 6483)
@@ -22,6 +22,7 @@
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -52,7 +53,9 @@
* @author Hiram Chirino (Cojonudo14 at hotmail.com)
* @author David Maplesden (David.Maplesden at orion.co.nz)
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a> $Id: JBossMessage.java 3466 2007-12-10 18:44:52Z timfox $
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * $Id: JBossMessage.java 3466 2007-12-10 18:44:52Z timfox $
*/
public class JBossMessage implements javax.jms.Message
{
@@ -73,7 +76,13 @@
private static final SimpleString JMS_ = new SimpleString("JMS_");
public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
+
+ public static final String JMS_JBM_INPUT_STREAM = "JMS_JBM_InputStream";
+ public static final String JMS_JBM_OUTPUT_STREAM = "JMS_JBM_OutputStream";
+
+ public static final String JMS_JBM_SAVE_STREAM = "JMS_JBM_SaveStream";
+
public static final String JMSXGROUPID = "JMSXGroupID";
// Used when bridging a message
@@ -755,6 +764,11 @@
public Object getObjectProperty(final String name) throws JMSException
{
+ if (JMS_JBM_INPUT_STREAM.equals(name))
+ {
+ return message.getBodyInputStream();
+ }
+ else
if (JMSXDELIVERYCOUNT.equals(name))
{
return String.valueOf(message.getDeliveryCount());
@@ -849,8 +863,28 @@
public void setObjectProperty(final String name, final Object value) throws JMSException
{
+
+ if (JMS_JBM_OUTPUT_STREAM.equals(name))
+ {
+ this.setOutputStream((OutputStream)value);
+ return;
+ }
+ else
+ if (JMS_JBM_SAVE_STREAM.equals(name))
+ {
+ this.saveToOutputStream((OutputStream)value);
+ return;
+ }
+
checkProperty(name, value);
+
+ if (JMS_JBM_INPUT_STREAM.equals(name))
+ {
+ this.setInputStream((InputStream)value);
+ return;
+ }
+
SimpleString key = new SimpleString(name);
if (value instanceof Boolean)
@@ -933,25 +967,64 @@
}
- public void setInputStream(final InputStream input) throws MessagingException
+ public void setInputStream(final InputStream input) throws JMSException
{
+ checkStream();
+ if (readOnly)
+ {
+ throw new MessageNotWriteableException("Message is read-only");
+ }
+
message.setBodyInputStream(input);
}
-
- public void setOutputStream(final OutputStream output) throws MessagingException
+ public void setOutputStream(final OutputStream output) throws JMSException
{
- message.setOutputStream(output);
+ checkStream();
+ if (!readOnly)
+ {
+ throw new IllegalStateException("OutputStream property is only valid on received messages");
+ }
+
+ try
+ {
+ message.setOutputStream(output);
+ }
+ catch (MessagingException e)
+ {
+ throw JMSExceptionHelper.convertFromMessagingException(e);
+ }
}
- public void saveToOutputStream(final OutputStream output) throws MessagingException
+ public void saveToOutputStream(final OutputStream output) throws JMSException
{
- message.saveToOutputStream(output);
+ checkStream();
+ if (!readOnly)
+ {
+ throw new IllegalStateException("OutputStream property is only valid on received messages");
+ }
+
+ try
+ {
+ message.saveToOutputStream(output);
+ }
+ catch (MessagingException e)
+ {
+ throw JMSExceptionHelper.convertFromMessagingException(e);
+ }
}
- public boolean waitCompletionOnStream(long timeWait) throws MessagingException
+ public boolean waitCompletionOnStream(long timeWait) throws JMSException
{
- return message.waitOutputStreamCompletion(timeWait);
+ checkStream();
+ try
+ {
+ return message.waitOutputStreamCompletion(timeWait);
+ }
+ catch (MessagingException e)
+ {
+ throw JMSExceptionHelper.convertFromMessagingException(e);
+ }
}
@@ -992,6 +1065,14 @@
// Private ------------------------------------------------------------
+ private void checkStream() throws JMSException
+ {
+ if (!(message.getType() == JBossBytesMessage.TYPE || message.getType() == JBossStreamMessage.TYPE))
+ {
+ throw new IllegalStateException("LargeMessage streaming is only possible on ByteMessage or StreamMessage");
+ }
+ }
+
private void checkProperty(final String name, final Object value) throws JMSException
{
if (propertiesReadOnly)
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java 2009-04-17 17:50:33 UTC (rev 6483)
@@ -30,12 +30,12 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
-import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
-import org.jboss.messaging.jms.client.JBossMessage;
import org.jboss.test.messaging.jms.JMSTestCase;
/**
@@ -68,11 +68,10 @@
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage m = session.createBytesMessage();
- ((JBossMessage)m).setInputStream(createFakeLargeStream(1024 * 1024));
+ m.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(1024 * 1024));
prod.send(m);
@@ -127,11 +126,10 @@
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage m = session.createBytesMessage();
- ((JBossMessage)m).setInputStream(createFakeLargeStream(10));
+ m.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(10));
prod.send(m);
@@ -171,11 +169,82 @@
}
+ public void testExceptionsOnSettingNonStreaming() throws Exception
+ {
+ Connection conn = null;
+ try
+ {
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ TextMessage msg = session.createTextMessage();
+
+ try
+ {
+ msg.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(10));
+ fail("Exception was expected");
+ }
+ catch (JMSException e)
+ {
+ }
+
+ msg.setText("hello");
+
+ MessageProducer prod = session.createProducer(queue1);
+
+ prod.send(msg);
+
+ conn.close();
+
+ conn = cf.createConnection();
+
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue1);
+
+ conn.start();
+
+ TextMessage rm = (TextMessage)cons.receive(10000);
+
+ try
+ {
+ rm.setObjectProperty("JMS_JBM_OutputStream", new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ System.out.println("b = " + b);
+ }
+
+ });
+ fail("Exception was expected");
+ }
+ catch (JMSException e)
+ {
+ }
+
+
+ assertEquals("hello", rm.getText());
+
+ assertNotNull(rm);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
public void testWaitOnOutputStream() throws Exception
{
int msgSize = 1024 * 1024;
-
+
Connection conn = null;
try
@@ -185,11 +254,10 @@
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage m = session.createBytesMessage();
- ((JBossMessage)m).setInputStream(createFakeLargeStream(msgSize));
+ m.setObjectProperty("JMS_JBM_InputStream", createFakeLargeStream(msgSize));
prod.send(m);
@@ -207,13 +275,14 @@
assertNotNull(rm);
final AtomicLong numberOfBytes = new AtomicLong(0);
-
+
final AtomicInteger numberOfErrors = new AtomicInteger(0);
OutputStream out = new OutputStream()
{
int position = 0;
+
@Override
public void write(int b) throws IOException
{
@@ -224,18 +293,14 @@
numberOfErrors.incrementAndGet();
}
}
-
+
};
-
- ((JBossMessage)rm).setOutputStream(out);
-
- assertTrue(((JBossMessage)rm).waitCompletionOnStream(10000));
-
+ rm.setObjectProperty("JMS_JBM_SaveStream", out);
+
assertEquals(msgSize, numberOfBytes.get());
-
+
assertEquals(0, numberOfErrors.get());
-
}
finally
@@ -251,14 +316,14 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
- protected byte getSamplebyte(final long position)
+
+ protected byte getSamplebyte(final long position)
{
return (byte)('a' + (position) % ('z' - 'a' + 1));
}
// Creates a Fake LargeStream without using a real file
- protected InputStream createFakeLargeStream(final long size) throws Exception
+ protected InputStream createFakeLargeStream(final long size) throws Exception
{
return new InputStream()
{
@@ -294,8 +359,6 @@
}
-
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java 2009-04-17 17:34:02 UTC (rev 6482)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java 2009-04-17 17:50:33 UTC (rev 6483)
@@ -1,74 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ClientLargeMessageTest extends ServiceTestBase
-{
- public final SimpleString addressA = new SimpleString("addressA");
-
- public final SimpleString queueA = new SimpleString("queueA");
-
- public void testSendConsumeLargeMessage() throws Exception
- {
- MessagingServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = createInVMFactory();
- cf.setMinLargeMessageSize(1000);
- ClientSession sendSession = cf.createSession(false, true, true);
- ClientSession recSession = cf.createSession(false, true, true);
- sendSession.createQueue(addressA, queueA, false);
- ClientProducer cp = sendSession.createProducer(addressA);
- ClientConsumer cc = recSession.createConsumer(queueA);
- recSession.start();
- ClientMessage message = recSession.createClientMessage(false);
- byte[] bytes = new byte[3000];
- message.getBody().writeBytes(bytes);
- cp.send(message);
- ClientMessage m = cc.receive(5000);
- assertNotNull(m);
- byte[] recBytes = new byte[3000];
- m.getBody().readBytes(recBytes);
- assertEqualsByteArrays(bytes, recBytes);
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
-}
More information about the jboss-cvs-commits
mailing list