[jboss-cvs] JBoss Messaging SVN: r6470 - in trunk: examples/jms/application-layer-failover and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Apr 17 05:58:14 EDT 2009
Author: timfox
Date: 2009-04-17 05:58:14 -0400 (Fri, 17 Apr 2009)
New Revision: 6470
Modified:
trunk/.classpath
trunk/examples/jms/application-layer-failover/build.xml
trunk/examples/jms/clustered-queue/readme.html
trunk/examples/jms/common/build.xml
trunk/examples/jms/common/src/org/jboss/jms/example/JMSExample.java
trunk/examples/jms/common/src/org/jboss/jms/example/SpawnedVMSupport.java
trunk/examples/jms/interceptor/src/org/jboss/jms/example/SimpleInterceptor.java
trunk/examples/jms/large-message/
trunk/examples/jms/large-message/build.xml
trunk/examples/jms/large-message/readme.html
trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
Log:
finished off large message example
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/.classpath 2009-04-17 09:58:14 UTC (rev 6470)
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
- <classpathentry kind="src" path="examples/jms/interceptor/src"/>
<classpathentry kind="src" path="build/src"/>
<classpathentry kind="src" path="tests/jms-tests/config"/>
<classpathentry kind="src" path="tests/config"/>
@@ -15,6 +14,7 @@
<classpathentry kind="src" path="src/schemas"/>
<classpathentry kind="src" path="tests/joram-tests/src"/>
<classpathentry kind="src" path="tests/joram-tests/config"/>
+ <classpathentry kind="src" path="examples/jms/interceptor/src"/>
<classpathentry kind="src" path="examples/jms/application-layer-failover/src"/>
<classpathentry kind="src" path="examples/jms/automatic-failover/src"/>
<classpathentry kind="src" path="examples/jms/browser/src"/>
@@ -22,6 +22,7 @@
<classpathentry kind="src" path="examples/jms/clustered-durable-subscription/src"/>
<classpathentry kind="src" path="examples/jms/clustered-queue/src"/>
<classpathentry kind="src" path="examples/jms/clustered-topic/src"/>
+ <classpathentry kind="src" path="examples/jms/symmetric-cluster/src"/>
<classpathentry kind="src" path="examples/jms/bridge/src"/>
<classpathentry kind="src" path="examples/jms/common/src"/>
<classpathentry kind="src" path="examples/jms/dead-letter/src"/>
Modified: trunk/examples/jms/application-layer-failover/build.xml
===================================================================
--- trunk/examples/jms/application-layer-failover/build.xml 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/application-layer-failover/build.xml 2009-04-17 09:58:14 UTC (rev 6470)
@@ -45,8 +45,7 @@
<param name="jbm.example.runServer" value="false"/>
</antcall>
</target>
-
- <!-- Need to delete the server data dirs before each run since after failover cannot just rerun test since backup is now live -->
+
<target name="delete-files" depends="clean">
<delete file="./server1/KILL_ME"/>
</target>
Modified: trunk/examples/jms/clustered-queue/readme.html
===================================================================
--- trunk/examples/jms/clustered-queue/readme.html 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/clustered-queue/readme.html 2009-04-17 09:58:14 UTC (rev 6470)
@@ -1,6 +1,6 @@
<html>
<head>
- <title>JBoss Messaging JMS Load Balanced Queue Example/title>
+ <title>JBoss Messaging JMS Load Balanced Queue Example</title>
<link rel="stylesheet" type="text/css" href="../common/common.css">
</head>
<body>
Modified: trunk/examples/jms/common/build.xml
===================================================================
--- trunk/examples/jms/common/build.xml 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/common/build.xml 2009-04-17 09:58:14 UTC (rev 6470)
@@ -103,8 +103,8 @@
<target name="runExample" depends="compile">
- <property name="java-min-memory" value="512M"/>
- <property name="java-max-memory" value="2048M"/>
+ <property name="java-min-memory" value="512M"/>
+ <property name="java-max-memory" value="2048M"/>
<java classname="${example.classname}" fork="true" resultproperty="example-result">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms${java-min-memory}"/>
Modified: trunk/examples/jms/common/src/org/jboss/jms/example/JMSExample.java
===================================================================
--- trunk/examples/jms/common/src/org/jboss/jms/example/JMSExample.java 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/common/src/org/jboss/jms/example/JMSExample.java 2009-04-17 09:58:14 UTC (rev 6470)
@@ -53,6 +53,12 @@
private String[] configs;
+ private static final String[] defaultArgs = new String[] {"-Xms512M",
+ "-Xmx512M",
+ "-XX:+UseParallelGC",
+ "-XX:+AggressiveOpts",
+ "-XX:+UseFastAccessorMethods"};
+
protected void run(String[] serverVMArgs, String[] configs)
{
String runServerProp = System.getProperty("jbm.example.runServer");
@@ -98,7 +104,7 @@
}
catch (JMSException e)
{
- //ignore
+ // ignore
}
}
if (runServer)
@@ -118,16 +124,16 @@
protected void run(String[] args)
{
- run(null, args);
+ run(defaultArgs, args);
}
protected void killServer(int id) throws Exception
{
System.out.println("Killing server " + id);
- //We kill the server by creating a new file in the server dir which is checked for by the server
- //We can't use Process.destroy() since this does not do a hard kill - it causes shutdown hooks
- //to be called which cleanly shutdown the server
+ // We kill the server by creating a new file in the server dir which is checked for by the server
+ // We can't use Process.destroy() since this does not do a hard kill - it causes shutdown hooks
+ // to be called which cleanly shutdown the server
File file = new File("server" + id + "/KILL_ME");
file.createNewFile();
@@ -165,7 +171,17 @@
protected void startServer(int index) throws Exception
{
String config = configs[index];
- log.info("starting server with config '" + config + "' " + "logServerOutput " + logServerOutput);
+ log.info("starting server with config '" + config + "' " + "logServerOutput " + logServerOutput);
+ StringBuilder args = new StringBuilder();
+ for (int i = 0; i < allVMArgs.length; i++)
+ {
+ args.append(allVMArgs[i]);
+ if (i != allVMArgs.length - 1)
+ {
+ args.append(",");
+ }
+ }
+ log.info("and vm args: " + args.toString());
servers[index] = SpawnedVMSupport.spawnVM(
SpawnedJMSServer.class.getName(),
allVMArgs,
Modified: trunk/examples/jms/common/src/org/jboss/jms/example/SpawnedVMSupport.java
===================================================================
--- trunk/examples/jms/common/src/org/jboss/jms/example/SpawnedVMSupport.java 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/common/src/org/jboss/jms/example/SpawnedVMSupport.java 2009-04-17 09:58:14 UTC (rev 6470)
@@ -60,14 +60,12 @@
StringBuffer sb = new StringBuffer();
sb.append("java").append(' ');
-
- sb.append("-Xms512m -Xmx512m ");
-
+
for (String vmarg : vmargs)
{
sb.append(vmarg).append(' ');
}
-
+
String classPath = System.getProperty("java.class.path");
String pathSeparater = System.getProperty("path.separator");
classPath = classPath + pathSeparater + ".";
Modified: trunk/examples/jms/interceptor/src/org/jboss/jms/example/SimpleInterceptor.java
===================================================================
--- trunk/examples/jms/interceptor/src/org/jboss/jms/example/SimpleInterceptor.java 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/interceptor/src/org/jboss/jms/example/SimpleInterceptor.java 2009-04-17 09:58:14 UTC (rev 6470)
@@ -50,6 +50,9 @@
Message msg = realPacket.getServerMessage();
msg.putStringProperty(new SimpleString("newproperty"), new SimpleString("Hello from interceptor!"));
}
+ //We return true which means "call next interceptor" (if there is one) or target.
+ //If we returned false, it means "abort call" - no more interceptors would be called and neither would
+ //the target
return true;
}
Property changes on: trunk/examples/jms/large-message
___________________________________________________________________
Name: svn:ignore
- build
logs
+ build
logs
huge_message_received.dat
huge_message_to_send.dat
Modified: trunk/examples/jms/large-message/build.xml
===================================================================
--- trunk/examples/jms/large-message/build.xml 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/large-message/build.xml 2009-04-17 09:58:14 UTC (rev 6470)
@@ -32,20 +32,26 @@
<import file="../common/build.xml"/>
- <target name="run">
-
+ <target name="run" depends="delete-large-messages">
<antcall target="runExample">
<param name="example.classname" value="org.jboss.jms.example.LargeMessageExample"/>
- <param name="java-min-memory" value="128M"/>
- <param name="java-max-memory" value="128M"/>
+
+ <!-- We limit the client to running in only 50MB of RAM -->
+ <param name="java-min-memory" value="50M"/>
+ <param name="java-max-memory" value="50M"/>
</antcall>
</target>
- <target name="runRemote">
+ <target name="runRemote" depends="delete-large-messages">
<antcall target="runExample">
<param name="example.classname" value="org.jboss.jms.example.LargeMessageExample"/>
<param name="jbm.example.runServer" value="false"/>
</antcall>
</target>
+
+ <target name="delete-large-messages">
+ <delete file="huge_message_to_send.dat"/>
+ <delete file="huge_message_received.dat"/>
+ </target>
</project>
Modified: trunk/examples/jms/large-message/readme.html
===================================================================
--- trunk/examples/jms/large-message/readme.html 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/large-message/readme.html 2009-04-17 09:58:14 UTC (rev 6470)
@@ -7,53 +7,82 @@
<h1>Large Message Example</h1>
<br>
<p>This example shows you how to send and receive very large messages with JBoss Messaging.</p>
- <p>JBossMessaging provides an extension to JMS where you can use an InputStream or OutputStream as the source and destination for your messages. You can send messages as large as it would fit in your disk.</p>
- <p>You may also choose to read LargeMessages using the regular ByteStream or ByteMessage methods, but using the OutputStream will provide you a much better performance</p>
+ <p>JBoss Messaging supports the sending and receiving of huge messages, much larger than can fit in available RAM
+ on the client or server. Effectively the only limit to message size is the amount of disk space you have on the server.</p>
+ <p>Large messages are persisted on the server so they can survive a server restart. In other words JBoss Messaging doesn't just
+ do a simple socket stream from the sender to the consumer.</p>
+ <p>In order to do this JBossMessaging provides an extension to JMS where you can use an InputStream or OutputStream as the source and destination for your messages. You can send messages as large as it would fit in your disk.</p>
+ <p>You may also choose to read LargeMessages using the regular ByteStream or ByteMessage methods, but using the InputStream and OutputStream will provide you a much better performance</p>
<br>
- <h2>Example step-by-step</h2>
+ <h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>ant</code> from this directory</i></p>
+ <p>In this example we limit both the server and the client to be running in a maximum of 50MB of RAM,
+ and we send a message with a body of size 256MB.</p>
+ <p>JBoss Messaging can support much large message sizes but we
+ choose these sizes and limit RAM so the example runs more quickly.</p>
+ <p>We create a file on disk representing the message body, create
+ a FileInputStream on that file and set that InputStream as the body of the message before sending.</p>
+ <p>The message is sent, then we stop the server, and restart it. This demonstrates the large message will survive a restart of the server.</p>
+ <p>Once the server is restarted we receive the message and stream it's body to another file on disk.</p>
<br>
<ol>
- <li>First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the <code>client-jndi.properties</code> file in the directory <code>../common/config</code></li>
+ <li>Create an initial context to perform the JNDI lookup.</li>
<pre>
- <code>InitialContext initialContext = getContext();</code>
+ <code>initialContext = getContext(0);</code>
</pre>
- <li>We look-up the JMS queue object from JNDI</li>
+ <li>Perfom a lookup on the queue.</li>
<pre>
- <code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
+ <code>Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");</code>
</pre>
- <li>Perform a lookup on the Connection Factory. This ConnectionFactory has a special set on this example (jbm-jms.xml). Messages with more than 10K are considered large.</li>
+ <li>Perform a lookup on the Connection Factory. This ConnectionFactory has a special attribute set on it. jbm-jms.xml)
+ Messages with more than 10K are considered large.</li>
<pre>
- <code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
+ <code>ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");</code>
</pre>
- <li>We create a JMS connection</li>
+ <li>Create the JMS objects for sending the message.</li>
<pre>
- <code>connection = cf.createConnection();</code>
+ <code>
+ connection = cf.createConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+ </code>
</pre>
- <li>We create a JMS session. The session is created as non transacted and will auto acknowledge messages.</li>
+ <li>Create a huge file - this will form the body of the message we will send.</li>
<pre>
- <code>Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);</code>
+ <code>
+ File fileInput = new File("huge_message_to_send.dat");
+
+ fileInput.createNewFile();
+
+ createFile(fileInput, FILE_SIZE);
+ </code>
</pre>
- <li>We create a JMS message producer on the session. This will be used to send the messages.</li>
+ <li>Create a BytesMessage</li>
<pre>
- <code>MessageProducer messageProducer = session.createProducer(topic);</code>
+ <code>BytesMessage message = session.createBytesMessage();</code>
</pre>
- <li>Create a BytesMessage</li>
+ <li>We set the InputStream on the message. When sending the message will read the InputStream
+ until it gets EOF. In this case we point the InputStream at a file on disk, and it will suck up the entire
+ file, however we could use any InputStream not just a FileInputStream.</li>
<pre><code>
- BytesMessage message = session.createBytesMessage();
+ FileInputStream fileInputStream = new FileInputStream(fileInput);
+
+ BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+
+ ((JBossMessage)message).setInputStream(bufferedInput);
</code></pre>
- <li>Set the File Stream</li>
+ <li>Send the Message.</li>
<pre><code>
- FileInputStream fileInputStream = new FileInputStream(fileInput);
- BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
- ((JBossMessage)message).setInputStream(bufferedInput);
+ producer.send(message);
</code></pre>
@@ -62,40 +91,78 @@
<code>messageProducer.send(message);</code>
</pre>
- <li>We create a JMS Message Consumer to receive the message.</li>
+ <li>
+ To demonstrate that that we're not simply streaming the message from sending to consumer, we stop
+ the server and restart it before consuming the message. This demonstrates that the large message gets persisted, like a
+ normal persistent message, on the server. If you look at ./build/data/largeMessages you will see the largeMessage
+ stored on disk the server.
+ </li>
<pre>
- <code>MessageConsumer messageConsumer = session.createConsumer(queue);</code>
+ <code>
+ connection.close();
+
+ initialContext.close();
+
+ stopServer(0);
+
+ // Give the server a little time to shutdown properly
+ Thread.sleep(5000);
+
+ startServer(0);
+ </code>
</pre>
- <li>We start the connection. In order for delivery to occur on any consumers or subscribers on a connection, the connection must be started</li>
+ <li>Now the server is restarted we can recreate the JMS Objects, and start the new connection.</li>
<pre>
- <code>connection.start();</code>
+ <code>
+ initialContext = getContext(0);
+
+ queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+
+ cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ connection = cf.createConnection();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ connection.start();
+
+ </code>
</pre>
- <li>Receive the message'</li>
- <pre><code>
- BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(120000);
- System.out.println("Received message: " + messageReceived.getBodyLength() + " bytes");
+ <li>Receive the message. When we receive the large message we initially just receive the message with
+ an empty body.</li>
+ <pre><code>BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
</code></pre>
+ <li>We set an OutputStream on the message. This causes the message body to be written to the
+ OutputStream until there are no more bytes to be written.
+ You don't have to use a FileOutputStream, you can use any OutputStream.
+ You may choose to use the regular BytesMessage or
+ StreamMessage interface but this method is much faster for large messages.</li>
+ <pre><code>
+ File outputFile = new File("huge_message_received.dat");
- <li>Setting a stream to receive the message. You may choose to use the regular BytesMessage or STreamMessage interface but this method is much faster for large messages.</li>
- <pre><code>
- FileOutputStream fileOutputStream = new FileOutputStream(fileOutput);
+ FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
+
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
+
((JBossMessage)messageReceived).setOutputStream(bufferedOutput);
</code></pre>
- <li>setOutputStream is a non-blocking operation. You may choose to wait the streaming completion.</li>
+ <li>We wait until the entire message is written before continuing.</li>
<pre><code>
((JBossMessage)messageReceived).waitCompletionOnStream(300000);
</code></pre>
- <li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+ <li>Be sure to close our resources!</li>
<pre>
- <code>finally
+ <code>
+ finally
{
if (initialContext != null)
{
@@ -107,9 +174,6 @@
}
}</code>
</pre>
-
-
-
</ol>
</body>
</html>
\ No newline at end of file
Modified: trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java 2009-04-17 09:58:14 UTC (rev 6470)
@@ -41,26 +41,34 @@
import org.jboss.messaging.jms.client.JBossMessage;
/**
- * A simple JMS Queue example that creates a producer and consumer on a queue and sends then receives a message.
- *
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * This example demonstrates the ability of JBoss Messaging to send and consume a very large message, much
+ * bigger than can fit in RAM.
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
*/
public class LargeMessageExample extends JMSExample
{
public static void main(String[] args)
{
- new LargeMessageExample().run(args);
+ // We limit the server to running in only 50MB of RAM
+ String[] serverJMXArgs = new String[] {"-Xms50M",
+ "-Xmx50M",
+ "-XX:+UseParallelGC",
+ "-XX:+AggressiveOpts",
+ "-XX:+UseFastAccessorMethods"};
+
+ new LargeMessageExample().run(serverJMXArgs, args);
}
- private final long FILE_SIZE = 4l * 1024l * 1024l * 1024l; // 4G (if you want to change this size, make it a multiple
- // of 1024 * 1024)
+ //The message we will send is size 256MB, even though we are only running in 50MB of RAM on both client and server.
+ //JBoss Messaging will support much larger message sizes, but we use 512MB so the example runs in reasonable time.
+ private final long FILE_SIZE = 256 * 1024 * 1024;
public boolean runExample() throws Exception
{
Connection connection = null;
InitialContext initialContext = null;
- File fileInput = File.createTempFile("example", ".jbm");
- File fileOutput = File.createTempFile("example", ".jbm");
+
try
{
// Step 1. Create an initial context to perform the JNDI lookup.
@@ -69,87 +77,126 @@
// Step 2. Perfom a lookup on the queue
Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
- // Step 3. Perform a lookup on the Connection Factory. This ConnectionFactory has a special set on this
- // example. Messages with more than 10K are considered large
+ // Step 3. Perform a lookup on the Connection Factory. This ConnectionFactory has a special attribute set on
+ // it.
+ // Messages with more than 10K are considered large
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- // Step 4.Create a JMS Connection
+ // Step 4. Create the JMS objects
connection = cf.createConnection();
- // Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);
- // creating an arbitray file
+ // Step 5. Create a huge file - this will form the body of the message we will send.
+
+ System.out.println("Creating a file to send of size " + FILE_SIZE +
+ " bytes. This may take a little while... " +
+ "If this is too big for your disk you can easily change the FILE_SIZE in the example.");
+
+ File fileInput = new File("huge_message_to_send.dat");
+
+ fileInput.createNewFile();
+
createFile(fileInput, FILE_SIZE);
- // Step 7. Create a BytesMessage
+ System.out.println("File created.");
+
+ // Step 6. Create a BytesMessage - does it have to be a bytesmessage??
BytesMessage message = session.createBytesMessage();
-
- // Step 8. Set the InputStream
+ // Step 7. We set the InputStream on the message. When sending the message will read the InputStream
+ // until it gets EOF. In this case we point the InputStream at a file on disk, and it will suck up the entire
+ // file, however we could use any InputStream not just a FileInputStream.
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
((JBossMessage)message).setInputStream(bufferedInput);
+ System.out.println("Sending the huge message.");
+
// Step 9. Send the Message
producer.send(message);
System.out.println("Large Message sent");
- // if you sleep the example and look at ./build/data/largeMessages you will see the largeMessage stored on disk
+ System.out.println("Stopping server.");
- // Step 10. Create a JMS Message Consumer
+ // Step 10. To demonstrate that that we're not simply streaming the message from sending to consumer, we stop
+ // the server and restart it before consuming the message. This demonstrates that the large message gets
+ // persisted, like a
+ // normal persistent message, on the server. If you look at ./build/data/largeMessages you will see the
+ // largeMessage stored on disk the server
+
+ connection.close();
+
+ initialContext.close();
+
+ stopServer(0);
+
+ // Give the server a little time to shutdown properly
+ Thread.sleep(5000);
+
+ startServer(0);
+
+ System.out.println("Server restarted.");
+
+ // Step 11. Now the server is restarted we can recreate the JMS Objects, and start the new connection
+
+ initialContext = getContext(0);
+
+ queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+
+ cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ connection = cf.createConnection();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
MessageConsumer messageConsumer = session.createConsumer(queue);
- // Step 11. Start the Connection
connection.start();
- // Step 12. Receive the message
+ System.out.println("Receiving message.");
+
+ // Step 12. Receive the message. When we receive the large message we initially just receive the message with
+ // an empty body.
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
- System.out.println("Received message with: " + messageReceived.getBodyLength() + " bytes");
+ System.out.println("Received message with: " + messageReceived.getBodyLength() +
+ " bytes. Now streaming to file on disk.");
- FileOutputStream fileOutputStream = new FileOutputStream(fileOutput);
+ // Step 13. We set an OutputStream on the message. This causes the message body to be written to the
+ // OutputStream until there are no more bytes to be written.
+ // You don't have to use a FileOutputStream, you can use any OutputStream.
+ // You may choose to use the regular BytesMessage or
+ // StreamMessage interface but this method is much faster for large messages.
+
+ File outputFile = new File("huge_message_received.dat");
+
+ FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
+
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
- // Step 13. Setting a stream to receive the message. You may choose to use the regular BytesMessage or STreamMessage interface but this method is much faster for large messages.
((JBossMessage)messageReceived).setOutputStream(bufferedOutput);
-
- // Step 14. We don' t want to close the connection while the message is being processed.
+
+ // Step 14. We wait until the entire message is written before continuing.
((JBossMessage)messageReceived).waitCompletionOnStream(300000);
+ fileOutputStream.close();
- initialContext.close();
+ System.out.println("File streamed to disk. Size of received file on disk is " + outputFile.length());
return true;
}
finally
{
- // Deleting the tmporary files created
- try
- {
- fileInput.delete();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- fileOutput.delete();
- }
- catch (Throwable ignored)
- {
- }
-
- // Step 12. Be sure to close our JMS resources!
+ // Step 12. Be sure to close our resources!
if (initialContext != null)
{
initialContext.close();
}
+
if (connection != null)
{
connection.close();
@@ -158,17 +205,17 @@
}
/**
- * @param tmpFile
+ * @param file
* @param fileSize
* @throws FileNotFoundException
* @throws IOException
*/
- private void createFile(File tmpFile, long fileSize) throws FileNotFoundException, IOException
+ private void createFile(File file, long fileSize) throws FileNotFoundException, IOException
{
- FileOutputStream fileOut = new FileOutputStream(tmpFile);
+ FileOutputStream fileOut = new FileOutputStream(file);
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
byte[] outBuffer = new byte[1024 * 1024];
- for (long i = 0; i < fileSize; i += outBuffer.length) // 4G message
+ for (long i = 0; i < fileSize; i += outBuffer.length)
{
buffOut.write(outBuffer);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-17 09:58:14 UTC (rev 6470)
@@ -497,14 +497,6 @@
creditsToSend = 0;
sendCredits(credits);
-
- // sessionExecutor.execute(new Runnable()
- // {
- // public void run()
- // {
- // sendCredits(credits);
- // }
- // });
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2009-04-17 08:52:25 UTC (rev 6469)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2009-04-17 09:58:14 UTC (rev 6470)
@@ -169,7 +169,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
*/
- public void saveToOutputStream(OutputStream out) throws MessagingException
+ public void saveToOutputStream(final OutputStream out) throws MessagingException
{
if (largeMessage)
{
@@ -192,7 +192,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.client.ClientMessage#setOutputStream(java.io.OutputStream)
*/
- public void setOutputStream(OutputStream out) throws MessagingException
+ public void setOutputStream(final OutputStream out) throws MessagingException
{
if (largeMessage)
{
@@ -208,7 +208,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.client.ClientMessage#waitOutputStreamCompletion()
*/
- public boolean waitOutputStreamCompletion(long timeMilliseconds) throws MessagingException
+ public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws MessagingException
{
if (largeMessage)
{
More information about the jboss-cvs-commits
mailing list