[jboss-cvs] JBoss Messaging SVN: r4153 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 6 14:51:11 EDT 2008
Author: timfox
Date: 2008-05-06 14:51:11 -0400 (Tue, 06 May 2008)
New Revision: 4153
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
Log:
Disabled flow control for now
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-05-06 18:51:11 UTC (rev 4153)
@@ -126,26 +126,28 @@
{
ProducerSendMessage message = new ProducerSendMessage(address, msg.copy());
- //We only flow control with non-anonymous producers
- if (address == null)
- {
- while (windowSize == 0)
- {
- synchronized (this)
- {
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
-
- windowSize--;
- }
+ //TODO flow control disabled for now
+// //We only flow control with non-anonymous producers
+// if (address == null)
+// {
+// while (windowSize == 0)
+// {
+// synchronized (this)
+// {
+// try
+// {
+// wait();
+// }
+// catch (InterruptedException e)
+// {
+// }
+// }
+// }
+//
+// windowSize--;
+// }
+
if (msg.isDurable())
{
remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), message);
@@ -155,12 +157,12 @@
remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
}
- if (rateLimiter != null)
- {
- // Rate flow control
-
- rateLimiter.limit();
- }
+// if (rateLimiter != null)
+// {
+// // Rate flow control
+//
+// rateLimiter.limit();
+// }
}
public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java 2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java 2008-05-06 18:51:11 UTC (rev 4153)
@@ -60,7 +60,7 @@
PacketDispatcher previousDispatcher = localDispatchers.get(key);
localDispatchers.put(key, serverDispatcher);
- if(log.isDebugEnabled())
+ if (log.isDebugEnabled())
{
log.debug("registered " + key + " for " + serverDispatcher);
}
@@ -89,8 +89,6 @@
assert location != null;
String key = location.getLocation();
- log.info("*** Getting connector for " + location);
-
if (connectors.containsKey(key))
{
NIOConnectorHolder holder = connectors.get(key);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-05-06 18:51:11 UTC (rev 4153)
@@ -6,7 +6,8 @@
*/
package org.jboss.messaging.core.remoting.impl.invm;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.NIOSession;
@@ -31,6 +32,7 @@
private final PacketDispatcher clientDispatcher;
private final PacketDispatcher serverDispatcher;
private boolean connected;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(INVMSession.class);
@@ -53,6 +55,7 @@
public boolean close()
{
+ executor.shutdownNow();
connected = false;
return true;
}
@@ -73,25 +76,42 @@
{
// assert packet instanceof Packet;
- serverDispatcher.dispatch((Packet) packet,
- new PacketSender()
+ //Must be executed on different thread
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
{
- public void send(Packet response) throws Exception
- {
- serverDispatcher.callFilters(response);
- clientDispatcher.dispatch(response, null);
- }
-
- public long getSessionID()
- {
- return getID();
- }
-
- public String getRemoteAddress()
- {
- return "invm";
- }
- });
+ serverDispatcher.dispatch((Packet) packet,
+ new PacketSender()
+ {
+ public void send(Packet response) throws Exception
+ {
+ serverDispatcher.callFilters(response);
+ clientDispatcher.dispatch(response, null);
+ }
+
+ public long getSessionID()
+ {
+ return getID();
+ }
+
+ public String getRemoteAddress()
+ {
+ return "invm";
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to execute dispatch", e);
+ }
+ }
+ });
+
+
}
// public Object writeAndBlock(final Packet request, long timeout, TimeUnit timeUnit) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-06 18:51:11 UTC (rev 4153)
@@ -1052,11 +1052,13 @@
final int maxRateToUse = maxRate;
- if (address != null)
- {
- flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
- }
+ // TODO Flow control disabled for now
+// if (address != null)
+// {
+// flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+// }
+
long id = dispatcher.generateID();
ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-06 18:00:33 UTC (rev 4152)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-06 18:51:11 UTC (rev 4153)
@@ -1,24 +1,24 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.test.messaging.jms;
import java.io.Serializable;
@@ -47,7 +47,7 @@
// Constants -----------------------------------------------------
// Static --------------------------------------------------------
-
+
// Attributes ----------------------------------------------------
// Constructors --------------------------------------------------
@@ -62,62 +62,62 @@
public void testSendForeignWithForeignDestinationSet() throws Exception
{
Connection conn = null;
-
+
try
{
- conn = cf.createConnection();
+ conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer p = sess.createProducer(queue1);
-
+
MessageConsumer c = sess.createConsumer(queue1);
conn.start();
-
+
Message foreign = new SimpleJMSMessage(new SimpleDestination());
-
+
foreign.setJMSDestination(new SimpleDestination());
-
+
//the producer destination should override the foreign destination and the send should succeed
-
+
p.send(foreign);
Message m = c.receive(1000);
-
+
assertNotNull(m);
-
+
}
finally
{
conn.close();
}
}
-
+
private static class SimpleDestination implements Destination, Serializable
{
}
-
+
public void testSendToQueuePersistent() throws Exception
{
- sendToQueue(true);
+ sendToQueue(true);
}
-
+
public void testSendToQueueNonPersistent() throws Exception
{
- sendToQueue(false);
+ sendToQueue(false);
}
-
+
private void sendToQueue(boolean persistent) throws Exception
{
Connection pconn = null;
Connection cconn = null;
-
+
try
{
- pconn = cf.createConnection();
- cconn = cf.createConnection();
-
+ pconn = cf.createConnection();
+ cconn = cf.createConnection();
+
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(queue1);
@@ -140,16 +140,54 @@
}
}
+ public void testSpeed() throws Exception
+ {
+ Connection pconn = null;
+
+ try
+ {
+ pconn = cf.createConnection();
+
+ Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer p = ps.createProducer(queue1);
+
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final int numMessages = 10000;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ Message msg = ps.createMessage();
+
+ p.send(msg);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ log.info("rate " + actualRate + " msgs /sec");
+
+ }
+ finally
+ {
+
+ }
+ }
+
public void testTransactedSendPersistent() throws Exception
{
- transactedSend(true);
+ transactedSend(true);
}
-
+
public void testTransactedSendNonPersistent() throws Exception
{
- transactedSend(false);
+ transactedSend(false);
}
-
+
private void transactedSend(boolean persistent) throws Exception
{
Connection pconn = null;
@@ -157,9 +195,9 @@
try
{
- pconn = cf.createConnection();
- cconn = cf.createConnection();
-
+ pconn = cf.createConnection();
+ cconn = cf.createConnection();
+
cconn.start();
Session ts = pconn.createSession(true, -1);
@@ -183,7 +221,7 @@
cconn.close();
}
}
-
+
//I moved this into it's own class so we can catch any exception that occurs
//Since this test intermittently fails.
//(As an aside, technically this test is invalid anyway since the sessions is used for sending
@@ -191,18 +229,18 @@
private class Sender implements Runnable
{
volatile Exception ex;
-
+
MessageProducer prod;
-
+
Message m;
-
+
Sender(MessageProducer prod, Message m)
{
this.prod = prod;
-
+
this.m = m;
}
-
+
public synchronized void run()
{
try
@@ -212,20 +250,20 @@
catch(Exception e)
{
log.error(e);
-
+
ex = e;
}
}
}
-
+
public void testPersistentSendToTopic() throws Exception
{
- sendToTopic(true);
+ sendToTopic(true);
}
-
+
public void testNonPersistentSendToTopic() throws Exception
{
- sendToTopic(false);
+ sendToTopic(false);
}
private void sendToTopic(boolean persistent) throws Exception
@@ -239,15 +277,15 @@
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer p = ps.createProducer(topic1);
-
+
p.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
+
MessageConsumer c = cs.createConsumer(topic1);
cconn.start();
TextMessage m1 = ps.createTextMessage("test");
-
+
Sender sender = new Sender(p, m1);
Thread t = new Thread(sender, "Producer Thread");
@@ -255,7 +293,7 @@
t.start();
TextMessage m2 = (TextMessage)c.receive(5000);
-
+
if (sender.ex != null)
{
//If an exception was caught in sending we rethrow here so as not to lose it
@@ -273,9 +311,9 @@
cconn.close();
}
}
-
+
/**
* Test sending via anonymous producer
* */
@@ -342,7 +380,7 @@
p.send(m);
TextMessage rec = (TextMessage)c.receive(3000);
-
+
assertEquals("something", rec.getText());
}
@@ -395,36 +433,36 @@
pconn.close();
}
}
-
+
//Is this test valid?
//How can we check if the destination is valid if it is created on the client side only??
// TODO - verify what spec says about this and enable/delete the test accordingly
-// public void testCreateProducerOnInexistentDestination() throws Exception
-// {
-// Connection pconn = cf.createConnection();
-//
-// try
-// {
-// Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// try
-// {
-// ps.createProducer(new JBossTopic("NoSuchTopic"));
-// fail("should throw exception");
-// }
-// catch(InvalidDestinationException e)
-// {
-// // OK
-// }
-// }
-// finally
-// {
-// pconn.close();
-// }
-// }
+// public void testCreateProducerOnInexistentDestination() throws Exception
+// {
+// Connection pconn = cf.createConnection();
+// try
+// {
+// Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+// try
+// {
+// ps.createProducer(new JBossTopic("NoSuchTopic"));
+// fail("should throw exception");
+// }
+// catch(InvalidDestinationException e)
+// {
+// // OK
+// }
+// }
+// finally
+// {
+// pconn.close();
+// }
+// }
+
//
// disabled MessageID tests
//
@@ -781,13 +819,13 @@
}
// Package protected ---------------------------------------------
-
+
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
-
+
// Inner classes -------------------------------------------------
-
-
+
+
}
More information about the jboss-cvs-commits
mailing list