JBoss hornetq SVN: r8905 - trunk/src/main/org/hornetq/ra/inflow.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-02-26 19:01:24 -0500 (Fri, 26 Feb 2010)
New Revision: 8905
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-312 - Fixing multiple sessions on MDB durable
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-02-26 17:32:57 UTC (rev 8904)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-02-27 00:01:24 UTC (rev 8905)
@@ -261,7 +261,7 @@
{
ClientSession session = setupSession();
- HornetQMessageHandler handler = new HornetQMessageHandler(this, session);
+ HornetQMessageHandler handler = new HornetQMessageHandler(this, session, i);
handler.setup();
session.start();
handlers.add(handler);
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-02-26 17:32:57 UTC (rev 8904)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-02-27 00:01:24 UTC (rev 8905)
@@ -64,11 +64,14 @@
private final HornetQActivation activation;
private boolean useLocalTx;
+
+ private final int sessionNr;
- public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session)
+ public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session, final int sessionNr)
{
this.activation = activation;
this.session = session;
+ this.sessionNr = sessionNr;
}
public void setup() throws Exception
@@ -107,6 +110,13 @@
}
else
{
+ // The check for already exists should be done only at the first session
+ // As a deployed MDB could set up multiple instances in order to process messages in parallel.
+ if (sessionNr == 0 && subResponse.getConsumerCount() > 0)
+ {
+ throw new javax.jms.IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+ }
+
SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null ||
14 years, 10 months
JBoss hornetq SVN: r8904 - in trunk: tests/jms-tests/src/org/hornetq/jms/tests and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-02-26 12:32:57 -0500 (Fri, 26 Feb 2010)
New Revision: 8904
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/MessageConsumerTest.java
Log:
fixed closed consumer
* checkClosed() must throw a JMS illegal state exception even when only
the consumer has been closed (and not the session or connection)
* add fail() calls to MessageConsumerTest to check methods can not be called on a closed consumer
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2010-02-26 14:40:18 UTC (rev 8903)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2010-02-26 17:32:57 UTC (rev 8904)
@@ -64,6 +64,8 @@
private final String selector;
private final SimpleString autoDeleteQueueName;
+
+ private boolean closed = false;
// Constructors --------------------------------------------------
@@ -154,12 +156,18 @@
{
throw JMSExceptionHelper.convertFromHornetQException(e);
}
+ finally
+ {
+ closed = true;
+ }
}
// QueueReceiver implementation ----------------------------------
public Queue getQueue() throws JMSException
{
+ checkClosed();
+
return (Queue)destination;
}
@@ -167,6 +175,8 @@
public Topic getTopic() throws JMSException
{
+ checkClosed();
+
return (Topic)destination;
}
@@ -191,7 +201,7 @@
private void checkClosed() throws JMSException
{
- if (session.getCoreSession().isClosed())
+ if (closed || session.getCoreSession().isClosed())
{
throw new IllegalStateException("Consumer is closed");
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/MessageConsumerTest.java 2010-02-26 14:40:18 UTC (rev 8903)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/MessageConsumerTest.java 2010-02-26 17:32:57 UTC (rev 8904)
@@ -1572,6 +1572,7 @@
try
{
topicConsumer.getMessageSelector();
+ fail("must throw a JMS IllegalStateException");
}
catch (javax.jms.IllegalStateException e)
{
@@ -1629,6 +1630,7 @@
try
{
((TopicSubscriber)topicConsumer).getTopic();
+ fail("must throw a JMS IllegalStateException");
}
catch (javax.jms.IllegalStateException e)
{
@@ -1686,6 +1688,7 @@
try
{
((QueueReceiver)queueConsumer).getQueue();
+ fail("must throw a JMS IllegalStateException");
}
catch (javax.jms.IllegalStateException e)
{
14 years, 10 months
JBoss hornetq SVN: r8903 - in trunk: tests/src/org/hornetq/tests/integration/xa and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-02-26 09:40:18 -0500 (Fri, 26 Feb 2010)
New Revision: 8903
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-313 - add checks only for local tx when id is the same
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-02-26 14:37:37 UTC (rev 8902)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-02-26 14:40:18 UTC (rev 8903)
@@ -523,7 +523,7 @@
public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
{
- if (tx != null)
+ if (tx != null && tx.getXid().equals(xid))
{
final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
@@ -557,7 +557,7 @@
if (theTx.getState() == Transaction.State.SUSPENDED)
{
// Put it back
- resourceManager.putTransaction(xid, tx);
+ resourceManager.putTransaction(xid, theTx);
throw new HornetQXAException(XAException.XAER_PROTO, "Cannot commit transaction, it is suspended " + xid);
}
@@ -697,7 +697,7 @@
public void xaRollback(final Xid xid) throws Exception
{
- if (tx != null)
+ if (tx != null && tx.getXid().equals(xid))
{
final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
@@ -794,7 +794,7 @@
public void xaPrepare(final Xid xid) throws Exception
{
- if (tx != null)
+ if (tx != null && tx.getXid().equals(xid))
{
final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-02-26 14:37:37 UTC (rev 8902)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-02-26 14:40:18 UTC (rev 8903)
@@ -139,6 +139,64 @@
session2.close();
}
+
+
+ public void testXAInterleaveResourceSuspendWorkCommit() throws Exception
+ {
+ Xid xid = newXID();
+ Xid xid2 = newXID();
+ ClientProducer clientProducer = clientSession.createProducer(atestq);
+ ClientSession recSession = sessionFactory.createSession();
+ recSession.start();
+ ClientConsumer clientConsumer = recSession.createConsumer(atestq);
+ ClientMessage m1 = createTextMessage(clientSession, "m1");
+ ClientMessage m2 = createTextMessage(clientSession, "m2");
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientSession.end(xid, XAResource.TMSUSPEND);
+ clientSession.start(xid2, XAResource.TMNOFLAGS);
+ clientProducer.send(m2);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.commit(xid, true);
+ ClientMessage message = clientConsumer.receiveImmediate();
+ assertNotNull(message);
+ message = clientConsumer.receiveImmediate();
+ assertNull(message);
+ clientSession.end(xid2, XAResource.TMSUCCESS);
+ clientSession.commit(xid2, true);
+ message = clientConsumer.receiveImmediate();
+ assertNotNull(message);
+ }
+
+ public void testXAInterleaveResourceRollbackAfterPrepare() throws Exception
+ {
+ Xid xid = newXID();
+ Xid xid2 = newXID();
+ Xid xid3 = newXID();
+ ClientProducer clientProducer = clientSession.createProducer(atestq);
+ ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+ ClientMessage m1 = createTextMessage(clientSession, "m1");
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+ clientSession.commit(xid, false);
+ clientSession.start();
+ clientSession.start(xid2, XAResource.TMNOFLAGS);
+ ClientMessage m2 = clientConsumer.receiveImmediate();
+ assertNotNull(m2);
+ clientSession.end(xid2, XAResource.TMSUCCESS);
+ clientSession.prepare(xid2);
+ clientSession.rollback(xid2);
+
+ clientSession.start(xid3, XAResource.TMNOFLAGS);
+ m2 = clientConsumer.receiveImmediate();
+ assertNotNull(m2);
+ clientSession.end(xid3, XAResource.TMSUCCESS);
+ clientSession.prepare(xid3);
+ clientSession.commit(xid3, false);
+ }
+
public void testSendPrepareDoesntRollbackOnClose() throws Exception
{
Xid xid = newXID();
14 years, 10 months
JBoss hornetq SVN: r8902 - trunk/src/main/org/hornetq/jms/client.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-02-26 09:37:37 -0500 (Fri, 26 Feb 2010)
New Revision: 8902
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-314 - add check for temp queues
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-02-26 01:06:23 UTC (rev 8901)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-02-26 14:37:37 UTC (rev 8902)
@@ -311,9 +311,23 @@
final int maxMessages) throws JMSException
{
checkClosed();
+
+ checkTempQueues(destination);
return null;
}
+ private void checkTempQueues(Destination destination)
+ throws JMSException
+ {
+ HornetQDestination jbdest = (HornetQDestination)destination;
+
+ if (jbdest.isTemporary() && !containsTemporaryQueue(jbdest.getSimpleAddress()))
+ {
+ throw new JMSException("Can not create consumer for temporary destination " + destination +
+ " from another JMS connection");
+ }
+ }
+
public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
final String subscriptionName,
final String messageSelector,
@@ -327,7 +341,7 @@
String msg = "Cannot create a durable connection consumer on a QueueConnection";
throw new javax.jms.IllegalStateException(msg);
}
-
+ checkTempQueues(topic);
// TODO
return null;
}
@@ -346,7 +360,7 @@
final int maxMessages) throws JMSException
{
checkClosed();
-
+ checkTempQueues(queue);
return null;
}
@@ -364,7 +378,7 @@
final int maxMessages) throws JMSException
{
checkClosed();
-
+ checkTempQueues(topic);
return null;
}
14 years, 10 months
JBoss hornetq SVN: r8901 - trunk/src/main/org/hornetq/ra.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-02-25 20:06:23 -0500 (Thu, 25 Feb 2010)
New Revision: 8901
Modified:
trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
Log:
Adding property for backward compatibility with AS5 and integration on AS6
Modified: trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2010-02-26 00:54:26 UTC (rev 8900)
+++ trunk/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2010-02-26 01:06:23 UTC (rev 8901)
@@ -273,6 +273,12 @@
this.clientID = clientID;
}
+ /** This is for backward compatibility */
+ public void setClientId(final String clientID)
+ {
+ setClientID(clientID);
+ }
+
public Integer getDupsOKBatchSize()
{
if (ConnectionFactoryProperties.trace)
14 years, 10 months
JBoss hornetq SVN: r8900 - trunk/src/main/org/hornetq/ra/inflow.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-02-25 19:54:26 -0500 (Thu, 25 Feb 2010)
New Revision: 8900
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-312 and adding a few backwards compatilibity properites on the resource adapter
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-02-25 14:47:40 UTC (rev 8899)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-02-26 00:54:26 UTC (rev 8900)
@@ -718,22 +718,22 @@
// here for backwards compatibilty
public void setUseDLQ(final boolean b)
{
-
}
public void setDLQJNDIName(final String name)
{
-
}
+
+ public void setDLQHandler(final String handler)
+ {
+ }
public void setDLQMaxResent(final int maxResent)
{
-
}
public void setProviderAdapterJNDI(final String jndi)
{
-
}
/**
@@ -751,4 +751,9 @@
}
+ public void setReconnectInterval(long interval)
+ {
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-02-25 14:47:40 UTC (rev 8899)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-02-26 00:54:26 UTC (rev 8900)
@@ -107,12 +107,6 @@
}
else
{
- // Already exists
- if (subResponse.getConsumerCount() > 0)
- {
- throw new javax.jms.IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
- }
-
SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null ||
14 years, 10 months
JBoss hornetq SVN: r8899 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-02-25 09:47:40 -0500 (Thu, 25 Feb 2010)
New Revision: 8899
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* do *not* flush when sending a frame on the wire
* removed the executor step when handling a buffer
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-24 09:36:53 UTC (rev 8898)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-25 14:47:40 UTC (rev 8899)
@@ -141,20 +141,14 @@
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
- executor.execute(new Runnable()
+ try
{
- public void run()
- {
- try
- {
- doHandleBuffer(connection, buffer);
- }
- finally
- {
- server.getStorageManager().clearContext();
- }
- }
- });
+ doHandleBuffer(connection, buffer);
+ }
+ finally
+ {
+ server.getStorageManager().clearContext();
+ }
}
private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
@@ -618,7 +612,7 @@
try
{
HornetQBuffer buffer = frame.toHornetQBuffer();
- connection.getTransportConnection().write(buffer, true);
+ connection.getTransportConnection().write(buffer, false);
}
catch (Exception e)
{
14 years, 10 months
JBoss hornetq SVN: r8898 - in trunk: tests/src/org/hornetq/tests/concurrent and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-02-24 04:36:53 -0500 (Wed, 24 Feb 2010)
New Revision: 8898
Added:
trunk/tests/src/org/hornetq/tests/concurrent/stomp/
trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added concurrent test with 1 socket sending Stomp messages and another socket receiving them
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-23 16:03:09 UTC (rev 8897)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-24 09:36:53 UTC (rev 8898)
@@ -141,14 +141,20 @@
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
- try
+ executor.execute(new Runnable()
{
- doHandleBuffer(connection, buffer);
- }
- finally
- {
- server.getStorageManager().clearContext();
- }
+ public void run()
+ {
+ try
+ {
+ doHandleBuffer(connection, buffer);
+ }
+ finally
+ {
+ server.getStorageManager().clearContext();
+ }
+ }
+ });
}
private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
@@ -542,13 +548,7 @@
public void done()
{
- executor.execute(new Runnable()
- {
- public void run()
- {
- doSend(connection, frame);
- }
- });
+ doSend(connection, frame);
}
});
}
Added: trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java 2010-02-24 09:36:53 UTC (rev 8898)
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.concurrent.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
+
+public class ConcurrentStompTest extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(ConcurrentStompTest.class);
+
+ private int port = 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private Socket stompSocket_2;
+
+ private ByteArrayOutputStream inputBuffer_2;
+
+ private HornetQServer server;
+
+ /**
+ * Send messages on 1 socket and receives them concurrently on another socket.
+ */
+ public void testSendManyMessages() throws Exception
+ {
+ String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+
+ sendFrame(stompSocket, connect);
+ String connected = receiveFrame(stompSocket, inputBuffer, 10000);
+ Assert.assertTrue(connected.startsWith("CONNECTED"));
+
+ sendFrame(stompSocket_2, connect);
+ connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
+ Assert.assertTrue(connected.startsWith("CONNECTED"));
+
+ final int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+
+ String subscribe =
+ "SUBSCRIBE\n" +
+ "destination:" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(stompSocket_2, subscribe);
+ Thread.sleep(2000);
+
+ new Thread()
+ {
+ public void run()
+ {
+ int i = 0;
+ while (true)
+ {
+ try
+ {
+ String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ System.out.println("<<< " + i++);
+ latch.countDown();
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ };
+ }.start();
+
+ String send = "SEND\n" + "destination:" + getQueueName() + "\n";
+ for (int i = 1; i <= count; i++)
+ {
+ // Thread.sleep(1);
+ System.out.println(">>> " + i);
+ sendFrame(stompSocket, send + "count:" + i + "\n\n" + Stomp.NULL);
+ }
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer();
+ server.start();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ stompSocket_2 = createSocket();
+ inputBuffer_2 = new ByteArrayOutputStream();
+
+ }
+
+ private HornetQServer createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ config.getQueueConfigurations().add(new CoreQueueConfiguration(getQueueName(), getQueueName(), null, false));
+ return HornetQServers.newHornetQServer(config);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (stompSocket != null)
+ {
+ stompSocket.close();
+ }
+
+ if (stompSocket_2 != null)
+ {
+ stompSocket_2.close();
+ }
+ server.stop();
+
+ super.tearDown();
+ }
+
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName()
+ {
+ return "test";
+ }
+
+ public void sendFrame(Socket socket, String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = socket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(Socket socket, ByteArrayOutputStream inputBuffer, long timeOut) throws Exception
+ {
+ socket.setSoTimeout((int)timeOut);
+ InputStream is = socket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
+}
14 years, 10 months
JBoss hornetq SVN: r8897 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-02-23 11:03:09 -0500 (Tue, 23 Feb 2010)
New Revision: 8897
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* send Stomp messages in a separate thread
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-23 15:56:30 UTC (rev 8896)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-23 16:03:09 UTC (rev 8897)
@@ -542,7 +542,13 @@
public void done()
{
- doSend(connection, frame);
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ doSend(connection, frame);
+ }
+ });
}
});
}
14 years, 10 months
JBoss hornetq SVN: r8896 - in trunk: tests/src/org/hornetq/tests/integration/jms/server/management and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-02-23 10:56:30 -0500 (Tue, 23 Feb 2010)
New Revision: 8896
Modified:
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-311: DestroyTopic management method does not delete the core resources associated to the topic
* when a topic is destroy, destroy *all the core queues bound to the topic address* too.
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-02-23 13:55:56 UTC (rev 8895)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-02-23 15:56:30 UTC (rev 8896)
@@ -28,6 +28,8 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.AddressControl;
+import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DiscoveryGroupConfiguration;
@@ -346,8 +348,14 @@
destinations.remove(name);
jmsManagementService.unregisterTopic(name);
- server.getHornetQServerControl().destroyQueue(HornetQDestination.createTopicAddressFromName(name).toString());
-
+ AddressControl addressControl = (AddressControl)server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
+ if (addressControl != null)
+ {
+ for (String queueName : addressControl.getQueueNames())
+ {
+ server.getHornetQServerControl().destroyQueue(queueName);
+ }
+ }
return true;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-02-23 13:55:56 UTC (rev 8895)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-02-23 15:56:30 UTC (rev 8896)
@@ -13,18 +13,19 @@
package org.hornetq.tests.integration.jms.server.management;
-import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
+import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DiscoveryGroupConfiguration;
@@ -35,6 +36,8 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
@@ -176,15 +179,29 @@
UnitTestCase.checkNoBinding(context, topicJNDIBinding);
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
-
+
JMSServerControl control = createManagementControl();
control.createTopic(topicName, topicJNDIBinding);
- UnitTestCase.checkBinding(context, topicJNDIBinding);
checkResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
+ Topic topic = (Topic)context.lookup(topicJNDIBinding);
+ assertNotNull(topic);
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ // create a consumer will create a Core queue bound to the topic address
+ session.createConsumer(topic);
+ String topicAddress = HornetQDestination.createTopicAddressFromName(topicName).toString();
+ AddressControl addressControl = (AddressControl)server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + topicAddress);
+ assertNotNull(addressControl);
+
+ assertTrue(addressControl.getQueueNames().length > 0);
+
+ connection.close();
control.destroyTopic(topicName);
-
+
+ assertNull(server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + topicAddress));
UnitTestCase.checkNoBinding(context, topicJNDIBinding);
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
}
14 years, 10 months