JBoss hornetq SVN: r7945 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-09 09:42:27 -0400 (Wed, 09 Sep 2009)
New Revision: 7945
Modified:
trunk/docs/user-manual/en/pre-acknowledge.xml
Log:
https://jira.jboss.org/jira/browse/HORNETQ-116
Modified: trunk/docs/user-manual/en/pre-acknowledge.xml
===================================================================
--- trunk/docs/user-manual/en/pre-acknowledge.xml 2009-09-09 13:06:55 UTC (rev 7944)
+++ trunk/docs/user-manual/en/pre-acknowledge.xml 2009-09-09 13:42:27 UTC (rev 7945)
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
-
<!-- ============================================================================= -->
<!-- Copyright © 2009 Red Hat, Inc. and others. -->
<!-- -->
@@ -17,7 +16,6 @@
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent -->
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
-
<chapter id="pre-acknowledge">
<title>Pre-Acknowledge Mode</title>
<para>JMS specifies 3 acknowledgement modes:</para>
@@ -36,6 +34,12 @@
server. However, in the case where you can afford to lose messages in event of failure, it
would make sense to acknowledge the message on the server <emphasis>before</emphasis>
delivering it to the client.</para>
+ <note>
+ <para>Please note, that if you use pre-acknowledge mode, then you will lose transactional
+ semantics for messages being consumed, since clearly they are being acknowledged first on
+ the server, not when you commit the transaction. This may be stating the obvious but we
+ like to be clear on these things to avoid confusion!</para>
+ </note>
<para>The disadvantage of acknowledging on the server before delivery is that the message will be
lost if the system crashes <emphasis>after</emphasis> acknowledging the message on the server
but <emphasis>before</emphasis> it is delivered to the client. In that case, the message is
@@ -62,14 +66,14 @@
// messages will be acknowledge on the server *before* being delivered to the client
Session session = connection.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
</programlisting>
- <para>Or you can set pre-acknowledge directly on the <literal>HornetQConnectionFactory</literal>
- instance using the setter method.</para>
+ <para>Or you can set pre-acknowledge directly on the <literal
+ >HornetQConnectionFactory</literal> instance using the setter method.</para>
<para>To use pre-acknowledgement mode using the core API you can set it directly on the
<literal>ClientSessionFactory</literal> instance using the setter method.</para>
</section>
<section>
<title>Example</title>
- <para>See <xref linkend="examples.pre-acknowledge" /> for an example which
- shows how to use pre-acknowledgement mode with with JMS.</para>
+ <para>See <xref linkend="examples.pre-acknowledge"/> for an example which shows how to use
+ pre-acknowledgement mode with with JMS.</para>
</section>
</chapter>
15 years, 3 months
JBoss hornetq SVN: r7944 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-09 09:06:55 -0400 (Wed, 09 Sep 2009)
New Revision: 7944
Modified:
trunk/docs/user-manual/en/using-core.xml
Log:
fixed typo in docs about message priority
Modified: trunk/docs/user-manual/en/using-core.xml
===================================================================
--- trunk/docs/user-manual/en/using-core.xml 2009-09-08 16:41:34 UTC (rev 7943)
+++ trunk/docs/user-manual/en/using-core.xml 2009-09-09 13:06:55 UTC (rev 7944)
@@ -45,7 +45,7 @@
will survive a server crash or restart. Non durable messages will never survive a
server crash or restart.</para>
<para>Messages can be specified with a priority value between 0 and 9. 0 represents the
- highest priority and 9 represents the lowest. HornetQ will attempt to deliver higher
+ lowest priority and 9 represents the highest. HornetQ will attempt to deliver higher
priority messages before lower priority ones.</para>
<para>Messages can be specified with an optional expiry time. HornetQ will not deliver
messages after its expiry time has been exceeded.</para>
15 years, 3 months
JBoss hornetq SVN: r7943 - in trunk: src/main/org/hornetq/core/remoting/server/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-08 12:41:34 -0400 (Tue, 08 Sep 2009)
New Revision: 7943
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-121
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -149,7 +149,7 @@
if (handler != null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Cannot call receive(...) - a MessageHandler is set");
+ "Cannot call receive(...) - a MessageHandler is set");
}
if (clientWindowSize == 0)
@@ -210,7 +210,7 @@
{
// if we have already pre acked we cant expire
boolean expired = m.isExpired();
-
+
flowControlBeforeConsumption(m);
if (expired)
@@ -279,7 +279,7 @@
if (receiverThread != null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Cannot set MessageHandler - consumer is in receive(...)");
+ "Cannot set MessageHandler - consumer is in receive(...)");
}
boolean noPreviousHandler = handler == null;
@@ -400,7 +400,7 @@
// Flow control for the first packet, we will have others
flowControl(packet.getPacketSize(), false);
-
+
ClientMessageInternal currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
@@ -439,18 +439,18 @@
{
synchronized (this)
{
- //Need to send credits for the messages in the buffer
-
- for (ClientMessageInternal message: this.buffer)
+ // Need to send credits for the messages in the buffer
+
+ for (ClientMessageInternal message : this.buffer)
{
flowControlBeforeConsumption(message);
}
buffer.clear();
}
-
- //Need to send credits for the messages in the buffer
+ // Need to send credits for the messages in the buffer
+
waitForOnMessageToComplete();
}
@@ -742,17 +742,19 @@
flushAcks();
+ clearBuffer();
+
if (sendCloseMessage)
{
channel.sendBlocking(new SessionConsumerCloseMessage(id));
}
-
- clearBuffer();
}
- finally
+ catch (Throwable t)
{
- session.removeConsumer(this);
+ // Consumer close should always return without exception
}
+
+ session.removeConsumer(this);
}
private void clearBuffer()
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -461,7 +461,7 @@
}
public void run()
- {
+ {
while (!closed)
{
long now = System.currentTimeMillis();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -21,8 +21,10 @@
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.server.HornetQServer;
@@ -107,6 +109,61 @@
}
}
}
+
+ //Closing a session if the underlying remoting connection is deaad should cleanly
+ //release all resources
+ public void testCloseSessionOnDestroyedConnection() throws Exception
+ {
+ HornetQServer server = createServer(false);
+ try
+ {
+ //Make sure we have a short connection TTL so sessions will be quickly closed on the server
+ long ttl = 500;
+ server.getConfiguration().setConnectionTTLOverride(ttl);
+ server.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSessionInternal clientSession = (ClientSessionInternal)cf.createSession(false, true, true);
+ clientSession.createQueue(queueName, queueName, false);
+ ClientProducer producer = clientSession.createProducer();
+ ClientConsumer consumer = clientSession.createConsumer(queueName);
+
+ assertEquals(1, server.getRemotingService().getConnections().size());
+
+ RemotingConnection rc = clientSession.getConnection();
+
+ rc.fail(new HornetQException(HornetQException.INTERNAL_ERROR));
+
+ clientSession.close();
+
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ int cons = server.getRemotingService().getConnections().size();
+
+ if (cons == 0)
+ {
+ break;
+ }
+
+ long now = System.currentTimeMillis();
+
+ if (now - start > 10000)
+ {
+ throw new Exception("Timed out waiting for connections to close");
+ }
+
+ Thread.sleep(50);
+ }
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
public void testBindingQuery() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -16,16 +16,9 @@
import javax.jms.Session;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.tests.integration.jms.server.management.NullInitialContext;
import org.hornetq.tests.util.JMSTestBase;
-import org.hornetq.tests.util.UnitTestCase;
/**
*
Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java 2009-09-08 16:41:34 UTC (rev 7943)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A CloseDestroyedConnectionTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CloseDestroyedConnectionTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(CloseDestroyedConnectionTest.class);
+
+ private HornetQConnectionFactory cf;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ cf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ cf.setBlockOnPersistentSend(true);
+ cf.setPreAcknowledge(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ cf = null;
+
+ super.tearDown();
+ }
+
+ /*
+ * Closing a connection that is destroyed should cleanly close everything without throwing exceptions
+ */
+ public void testCloseDestroyedConnection() throws Exception
+ {
+ long connectionTTL = 500;
+ cf.setClientFailureCheckPeriod(connectionTTL / 2);
+ // Need to set connection ttl to a low figure so connections get removed quickly on the server
+ cf.setConnectionTTL(connectionTTL);
+
+ Connection conn = cf.createConnection();
+
+ assertEquals(1, server.getRemotingService().getConnections().size());
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Give time for the initial ping to reach the server before we fail (it has connection TTL in it)
+ Thread.sleep(500);
+
+ String queueName = "myqueue";
+
+ Queue queue = new HornetQQueue(queueName);
+
+ super.createQueue(queueName);
+
+ MessageConsumer consumer = sess.createConsumer(queue);
+
+ MessageProducer producer = sess.createProducer(queue);
+
+ QueueBrowser browser = sess.createBrowser(queue);
+
+ // Now fail the underlying connection
+
+ ClientSessionInternal sessi = (ClientSessionInternal)((HornetQSession)sess).getCoreSession();
+
+ RemotingConnection rc = sessi.getConnection();
+
+ rc.fail(new HornetQException(HornetQException.INTERNAL_ERROR));
+
+ // Now close the connection
+
+ conn.close();
+
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ int cons = server.getRemotingService().getConnections().size();
+
+ if (cons == 0)
+ {
+ break;
+ }
+
+ long now = System.currentTimeMillis();
+
+ if (now - start > 10000)
+ {
+ throw new Exception("Timed out waiting for connections to close");
+ }
+
+ Thread.sleep(50);
+ }
+ }
+}
15 years, 4 months
JBoss hornetq SVN: r7939 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-04 13:41:16 -0400 (Fri, 04 Sep 2009)
New Revision: 7939
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
small tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-04 17:34:23 UTC (rev 7938)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-04 17:41:16 UTC (rev 7939)
@@ -1588,12 +1588,6 @@
msg2.setOutputStream(createFakeOutputStream());
assertTrue(msg2.waitOutputStreamCompletion(60000));
- // for (int i = 0; i < SIZE; i++)
- // {
- // byte value = msg2.getBody().readByte();
- // assertEquals("Error position " + i, (byte)'a', value);
- // }
-
session.commit();
assertGlobalSize(server);
15 years, 4 months
JBoss hornetq SVN: r7938 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-04 13:34:23 -0400 (Fri, 04 Sep 2009)
New Revision: 7938
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
Adding small test on delivery count
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-03 22:28:19 UTC (rev 7937)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-04 17:34:23 UTC (rev 7938)
@@ -275,6 +275,89 @@
}
}
+
+ public void testDeliveryCount() throws Exception
+ {
+ final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = consumer.receive(10000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(1, msg.getDeliveryCount());
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+ session.rollback();
+
+ session.close();
+
+ session = sf.createSession(false, false, false);
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(10000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+ assertEquals(2, msg.getDeliveryCount());
+ msg.acknowledge();
+ consumer.close();
+
+ session.commit();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
public void testDLAOnExpiry() throws Exception
{
final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
15 years, 4 months
JBoss hornetq SVN: r7937 - in trunk: tests/src/org/hornetq/tests/integration/jms/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-03 18:28:19 -0400 (Thu, 03 Sep 2009)
New Revision: 7937
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
Log:
HORNETQ-123 - Fixing re send ObjectMessage
Modified: trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java 2009-09-03 21:06:39 UTC (rev 7936)
+++ trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java 2009-09-03 22:28:19 UTC (rev 7937)
@@ -293,7 +293,23 @@
super.doBeforeSend();
}
+
+ public void doBeforeReceive() throws Exception
+ {
+ super.doBeforeReceive();
+ try
+ {
+ int len = getBody().readInt();
+ data = new byte[len];
+ getBody().readBytes(data);
+ }
+ catch (Exception e)
+ {
+ data = null;
+ }
+ }
+
// ObjectMessage implementation ----------------------------------
public void setObject(Serializable object) throws JMSException
@@ -326,22 +342,8 @@
// lazy deserialize the Object the first time the client requests it
public Serializable getObject() throws JMSException
{
- if (data == null)
+ if (data == null || data.length == 0)
{
- try
- {
- int len = getBody().readInt();
- data = new byte[len];
- getBody().readBytes(data);
- }
- catch (Exception e)
- {
- return null;
- }
- }
-
- if (data.length == 0)
- {
return null;
}
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-03 22:28:19 UTC (rev 7937)
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * Receive Messages and resend them, like the bridge would do
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ResendTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Queue queue;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testResendMessage() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ try
+ {
+ conn.start();
+
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ ArrayList<Message> msgs = new ArrayList<Message>();
+
+ for (int i = 0; i < 10; i++)
+ {
+ MapMessage mm = sess.createMapMessage();
+ mm.setBoolean("boolean", true);
+ mm.setByte("byte", (byte)3);
+ mm.setBytes("bytes", new byte[] { (byte)3, (byte)4, (byte)5 });
+ mm.setChar("char", (char)6);
+ mm.setDouble("double", 7.0);
+ mm.setFloat("float", 8.0f);
+ mm.setInt("int", 9);
+ mm.setLong("long", 10l);
+ mm.setObject("object", new String("this is an object"));
+ mm.setShort("short", (short)11);
+ mm.setString("string", "this is a string");
+
+ msgs.add(mm);
+ msgs.add(sess.createTextMessage("hello" + i));
+ msgs.add(sess.createObjectMessage(new SomeSerializable("hello" + i)));
+ }
+
+ internalTestResend(msgs, sess);
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ public void internalTestResend(ArrayList<Message> msgs, Session sess) throws Exception
+ {
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ for (Message msg : msgs)
+ {
+ prod.send(msg);
+ }
+
+ sess.commit();
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ for (int i = 0; i < msgs.size(); i++)
+ {
+ Message msg = cons.receive(5000);
+ assertNotNull(msg);
+
+ prod.send(msg);
+ }
+
+ assertNull(cons.receiveNoWait());
+
+ sess.commit();
+
+ for (Message originalMessage : msgs)
+ {
+ Message copiedMessage = cons.receive(5000);
+ assertNotNull(copiedMessage);
+
+ assertEquals(copiedMessage.getClass(), originalMessage.getClass());
+
+ sess.commit();
+
+ if (copiedMessage instanceof MapMessage)
+ {
+ MapMessage copiedMap = (MapMessage)copiedMessage;
+ MapMessage originalMap = (MapMessage)originalMessage;
+ assertEquals(originalMap.getString("str"), copiedMap.getString("str"));
+ assertEquals(originalMap.getLong("long"), copiedMap.getLong("long"));
+ assertEquals(originalMap.getInt("int"), copiedMap.getInt("int"));
+ assertEquals(originalMap.getObject("object"), copiedMap.getObject("object"));
+ }
+ else if (copiedMessage instanceof ObjectMessage)
+ {
+ assertEquals(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
+ }
+ else if (copiedMessage instanceof TextMessage)
+ {
+ assertEquals(((TextMessage)originalMessage).getText(), ((TextMessage)copiedMessage).getText());
+ }
+ }
+
+ }
+
+ public static class SomeSerializable implements Serializable
+ {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -8576054940441747312L;
+
+ final String txt;
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (txt == null ? 0 : txt.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(final Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (getClass() != obj.getClass())
+ {
+ return false;
+ }
+ SomeSerializable other = (SomeSerializable)obj;
+ if (txt == null)
+ {
+ if (other.txt != null)
+ {
+ return false;
+ }
+ }
+ else if (!txt.equals(other.txt))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ SomeSerializable(final String txt)
+ {
+ this.txt = txt;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queue = createQueue("queue1");
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ queue = null;
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 4 months
JBoss hornetq SVN: r7936 - trunk/tests/src/org/hornetq/tests/unit/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-03 17:06:39 -0400 (Thu, 03 Sep 2009)
New Revision: 7936
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
Fixing test
Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-09-03 17:38:43 UTC (rev 7935)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-09-03 21:06:39 UTC (rev 7936)
@@ -257,6 +257,10 @@
catch (IndexOutOfBoundsException ignored)
{
}
+ catch (IllegalAccessError ignored)
+ {
+
+ }
catch (Throwable e)
{
e.printStackTrace();
15 years, 4 months
JBoss hornetq SVN: r7935 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-03 13:38:43 -0400 (Thu, 03 Sep 2009)
New Revision: 7935
Modified:
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
Adding exception check on closed consumers and large message
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-09-02 10:18:43 UTC (rev 7934)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-09-03 17:38:43 UTC (rev 7935)
@@ -63,6 +63,8 @@
private final long totalSize;
private boolean streamEnded = false;
+
+ private boolean streamClosed = false;
private final int readTimeout;
@@ -213,6 +215,7 @@
{
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
+ streamClosed = true;
notifyAll();
}
@@ -1262,6 +1265,11 @@
{
throw new IndexOutOfBoundsException();
}
+
+ if (streamClosed)
+ {
+ throw new IllegalAccessError("The consumer associated with this large message was closed before the body was read");
+ }
if (fileCache == null)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-02 10:18:43 UTC (rev 7934)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-03 17:38:43 UTC (rev 7935)
@@ -67,7 +67,83 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
+
+ public void testCloseConsumer() throws Exception
+ {
+ final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ log.info("*********** starting test");
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(ADDRESS, ADDRESS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ log.info("*********** sending large message");
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ msg1.acknowledge();
+ session.commit();
+ assertNotNull(msg1);
+
+ consumer.close();
+
+ try
+ {
+ msg1.getBody().readByte();
+ fail ("Exception was expected");
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
+
public void testDLALargeMessage() throws Exception
{
final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
15 years, 4 months
JBoss hornetq SVN: r7934 - in trunk: src/config and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-02 06:18:43 -0400 (Wed, 02 Sep 2009)
New Revision: 7934
Added:
trunk/src/main/org/hornetq/ra/inflow/JBoss4TransactionManagerLocator.java
trunk/src/main/org/hornetq/ra/inflow/JBoss5TransactionManagerLocator.java
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
trunk/src/config/ra.xml
trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-119 - added extra configuration to ra.xml to allow the class toload the transactionmanager to be configurable
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2009-08-31 17:11:31 UTC (rev 7933)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2009-09-02 10:18:43 UTC (rev 7934)
@@ -383,6 +383,16 @@
key=val;key=val; and will be specific to the connector used</entry>
</row>
<row>
+ <entry>TransactionManagerLocatorClass</entry>
+ <entry>String</entry>
+ <entry>The class to use to load the transaction manager</entry>
+ </row>
+ <row>
+ <entry>TransactionManagerLocatorMethod</entry>
+ <entry>String</entry>
+ <entry>The method to invoke on the TransactionManagerLocatorClass to get the transaction manager</entry>
+ </row>
+ <row>
<entry>useLocalTx</entry>
<entry>boolean</entry>
<entry>True will enable local transaction optimisation.</entry>
Modified: trunk/src/config/ra.xml
===================================================================
--- trunk/src/config/ra.xml 2009-08-31 17:11:31 UTC (rev 7933)
+++ trunk/src/config/ra.xml 2009-09-02 10:18:43 UTC (rev 7934)
@@ -51,7 +51,20 @@
<config-property-type>java.lang.Boolean</config-property-type>
<config-property-value>true</config-property-value>
</config-property>
- <!-- <config-property>
+ <!--
+ <config-property>
+ <description>The class to use for locatingthe transactionmanager</description>
+ <config-property-name>TransactionManagerLocatorClass</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>org.hornetq.ra.inflow.JBoss5TransactionManagerLocator</config-property-value>
+ </config-property>
+ <config-property>
+ <description>The method to use for locatingthe transactionmanager</description>
+ <config-property-name>TransactionManagerLocatorMethod</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>getTm</config-property-value>
+ </config-property>
+ <config-property>
<description>Use A local Transaction instead of XA?</description>
<config-property-name>UseLocalTx</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
Modified: trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2009-08-31 17:11:31 UTC (rev 7933)
+++ trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2009-09-02 10:18:43 UTC (rev 7934)
@@ -47,6 +47,10 @@
/** Use Local TX instead of XA */
private Boolean localTx = false;
+ private String transactionManagerLocatorClass = "org.hornetq.ra.inflow.JBoss5TransactionManagerLocator";
+
+ private String transactionManagerLocatorMethod = "getTm";
+
/**
* Constructor
*/
@@ -185,5 +189,24 @@
return useXA != null && useXA;
}
-
+
+ public void setTransactionManagerLocatorClass(String transactionManagerLocatorClass)
+ {
+ this.transactionManagerLocatorClass = transactionManagerLocatorClass;
+ }
+
+ public String getTransactionManagerLocatorClass()
+ {
+ return transactionManagerLocatorClass;
+ }
+
+ public void setTransactionManagerLocatorMethod(String transactionManagerLocatorMethod)
+ {
+ this.transactionManagerLocatorMethod = transactionManagerLocatorMethod;
+ }
+
+ public String getTransactionManagerLocatorMethod()
+ {
+ return transactionManagerLocatorMethod;
+ }
}
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-08-31 17:11:31 UTC (rev 7933)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-09-02 10:18:43 UTC (rev 7934)
@@ -211,6 +211,36 @@
log.info("HornetQ resource adapter stopped");
}
+ public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
+ {
+ if (trace)
+ {
+ log.trace("setTransactionManagerLocatorClass(" + transactionManagerLocatorClass + ")");
+ }
+
+ raProperties.setTransactionManagerLocatorClass(transactionManagerLocatorClass);
+ }
+
+ public String getTransactionManagerLocatorClass()
+ {
+ return raProperties.getTransactionManagerLocatorClass();
+ }
+
+ public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
+ {
+ if (trace)
+ {
+ log.trace("setTransactionManagerLocatorMethod(" + transactionManagerLocatorMethod + ")");
+ }
+
+ raProperties.setTransactionManagerLocatorMethod(transactionManagerLocatorMethod);
+ }
+
+ public String getTransactionManagerLocatorMethod()
+ {
+ return raProperties.getTransactionManagerLocatorMethod();
+ }
+
public void setConnectorClassName(final String connectorClassName)
{
if (trace)
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2009-08-31 17:11:31 UTC (rev 7933)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2009-09-02 10:18:43 UTC (rev 7934)
@@ -222,7 +222,18 @@
if (tm == null)
{
- tm = TransactionManagerLocator.locateTransactionManager();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class aClass = loader.loadClass(ra.getTransactionManagerLocatorClass());
+ Object o = aClass.newInstance();
+ Method m = aClass.getMethod(ra.getTransactionManagerLocatorMethod());
+ tm = (TransactionManager) m.invoke(o);
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to create TransactionManager from " + ra.getTransactionManagerLocatorClass() + "." + ra.getTransactionManagerLocatorMethod());
+ }
}
return tm;
Added: trunk/src/main/org/hornetq/ra/inflow/JBoss4TransactionManagerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/JBoss4TransactionManagerLocator.java (rev 0)
+++ trunk/src/main/org/hornetq/ra/inflow/JBoss4TransactionManagerLocator.java 2009-09-02 10:18:43 UTC (rev 7934)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.ra.inflow;
+
+import org.jboss.tm.TransactionManagerLocator;
+
+import javax.transaction.TransactionManager;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class JBoss4TransactionManagerLocator
+{
+ public TransactionManager getTm()
+ {
+ return TransactionManagerLocator.getInstance().locate();
+ }
+}
Added: trunk/src/main/org/hornetq/ra/inflow/JBoss5TransactionManagerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/JBoss5TransactionManagerLocator.java (rev 0)
+++ trunk/src/main/org/hornetq/ra/inflow/JBoss5TransactionManagerLocator.java 2009-09-02 10:18:43 UTC (rev 7934)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.ra.inflow;
+
+import org.jboss.tm.TransactionManagerLocator;
+
+import javax.transaction.TransactionManager;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class JBoss5TransactionManagerLocator
+{
+ public TransactionManager getTm()
+ {
+ return TransactionManagerLocator.locateTransactionManager();
+ }
+}
15 years, 4 months