[jboss-cvs] JBoss Messaging SVN: r4186 - in trunk: docs/examples/messaging/src/org/jboss/messaging/example and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 14 04:05:11 EDT 2008
Author: timfox
Date: 2008-05-14 04:05:10 -0400 (Wed, 14 May 2008)
New Revision: 4186
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
Modified:
trunk/.classpath
trunk/build-messaging.xml
trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java
trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
Log:
Several tweaks - interim commit - disabled pinging etc
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/.classpath 2008-05-14 08:05:10 UTC (rev 4186)
@@ -62,5 +62,5 @@
<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
<classpathentry exported="true" kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080418.144850.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080418.144850-sources.jar"/>
- <classpathentry kind="output" path="bin"/>
+ <classpathentry kind="output" path="eclipse-output"/>
</classpath>
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/build-messaging.xml 2008-05-14 08:05:10 UTC (rev 4186)
@@ -630,6 +630,7 @@
<target name="jms-tests" depends="jar, compile-jms-tests">
<echo message=""/>
<echo message="Running jms tests, fork=${junit.fork}, junit.batchtest.fork=${junit.batchtest.fork}"/>
+ <echo message="classpath is:${toString:unit.test.execution.classpath}"/>
<echo message=""/>
<mkdir dir="${test.output.dir}"/>
<mkdir dir="${test.reports.dir}"/>
Modified: trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java
===================================================================
--- trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -74,7 +74,7 @@
System.currentTimeMillis(), (byte) 1);
message.getBody().putString("Hello!");
clientProducer.send(message);
- ClientConsumer clientConsumer = clientSession.createConsumer(queue, null, false, false);
+ ClientConsumer clientConsumer = clientSession.createConsumer(queue);
clientConnection.start();
Message msg = clientConsumer.receive(5000);
System.out.println("msg.getPayload() = " + msg.getBody().getString());
Modified: trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
===================================================================
--- trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -62,7 +62,7 @@
System.currentTimeMillis(), (byte) 1);
message.getBody().putString("Hello!");
clientProducer.send(message);
- ClientConsumer clientConsumer = clientSession.createConsumer(queue, null, false, false);
+ ClientConsumer clientConsumer = clientSession.createConsumer(queue);
clientConnection.start();
Message msg = clientConsumer.receive(5000);
System.out.println("msg.getPayload() = " + msg.getBody().getString());
Modified: trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java
===================================================================
--- trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -99,7 +99,7 @@
System.currentTimeMillis(), (byte) 1);
message.getBody().putString("Hello!");
clientProducer.send(message);
- ClientConsumer clientConsumer = clientSession.createConsumer(atestq, null, false, false);
+ ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
clientConnection.start();
Message msg = clientConsumer.receive(5000);
System.out.println("msg.getPayload() = " + msg.getBody().getString());
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -41,6 +41,8 @@
ClientBrowser createBrowser(SimpleString queueName, SimpleString filterString) throws MessagingException;
+ ClientBrowser createBrowser(SimpleString queueName) throws MessagingException;
+
ClientProducer createProducer(SimpleString address) throws MessagingException;
ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -166,12 +166,12 @@
remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
}
-// if (rateLimiter != null)
-// {
-// // Rate flow control
-//
-// rateLimiter.limit();
-// }
+ if (rateLimiter != null)
+ {
+ // Rate flow control
+
+ rateLimiter.limit();
+ }
}
public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -293,6 +293,11 @@
return consumer;
}
+ public ClientBrowser createBrowser(final SimpleString queueName) throws MessagingException
+ {
+ return createBrowser(queueName, null);
+ }
+
public ClientBrowser createBrowser(final SimpleString queueName, final SimpleString filterString) throws MessagingException
{
checkClosed();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -23,6 +23,9 @@
import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
+import java.util.Timer;
+import java.util.TimerTask;
+
import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.RemotingSessionListener;
@@ -35,6 +38,7 @@
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
/**
*
@@ -97,7 +101,7 @@
log.trace(this + " started");
}
-
+
public void stop()
{
log.trace(this + " stop");
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -271,6 +271,11 @@
this.invmDisabledModified = true;
}
+ public void setSecurityEnabled(final boolean enabled)
+ {
+ this.securityEnabled = enabled;
+ }
+
public boolean isSSLEnabled()
{
if (System.getProperty(REMOTING_ENABLE_SSL_SYSPROP_KEY) != null && !sslEnabledModified)
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -35,7 +35,7 @@
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
-/**
+/**ConfigurationImpl
* This class allows the Configuration class to be configured via a config file.
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -131,7 +131,7 @@
this.journalTaskPeriod = getLong(e, "journal-task-period", 5000L);
this.securityEnabled = getBoolean(e, "security-enabled", true);
-
+
NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");
ArrayList<String> interceptorList = new ArrayList<String>();
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -119,9 +119,8 @@
buff.putLong(timestamp);
buff.putByte(priority);
properties.encode(buff);
- buff.putInt(body.limit());
- buff.putBytes(body.array(), 0, body.limit());
-
+ buff.putInt(body.limit());
+ buff.putBytes(body.array(), 0, body.limit());
}
public int encodeSize()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -36,14 +36,17 @@
private static final long serialVersionUID = -4626926952268528384L;
- public static final Logger log = Logger.getLogger(PacketDispatcherImpl.class);
+ public static final Logger log = Logger
+ .getLogger(PacketDispatcherImpl.class);
+ private static boolean trace = log.isTraceEnabled();
+
// Attributes ----------------------------------------------------
private final Map<Long, PacketHandler> handlers;
public final List<Interceptor> filters;
private transient PacketHandlerRegistrationListener listener;
-
+
private final AtomicLong idSequence = new AtomicLong(0);
// Static --------------------------------------------------------
@@ -52,7 +55,7 @@
public PacketDispatcherImpl(final List<Interceptor> filters)
{
- handlers = new ConcurrentHashMap<Long, PacketHandler>();
+ handlers = new ConcurrentHashMap<Long, PacketHandler>();
this.filters = filters;
}
@@ -61,25 +64,26 @@
public long generateID()
{
long id = idSequence.getAndIncrement();
-
+
if (id == 0)
{
- //ID 0 is reserved for the connection factory handler
+ // ID 0 is reserved for the connection factory handler
id = generateID();
}
-
+
return id;
}
-
+
public void register(final PacketHandler handler)
- {
+ {
handlers.put(handler.getID(), handler);
- if (log.isDebugEnabled())
+ if (trace)
{
- log.debug("registered " + handler + " with ID " + handler.getID() + " (" + this + ")");
+ log.trace("registered " + handler + " with ID " + handler.getID()
+ + " (" + this + ")");
}
-
+
if (listener != null)
{
listener.handlerRegistered(handler.getID());
@@ -89,23 +93,23 @@
public void unregister(final long handlerID)
{
PacketHandler handler = handlers.remove(handlerID);
-
+
if (handler == null)
{
log.warn("no handler defined for " + handlerID);
- dump();
+ dump();
}
- if (log.isDebugEnabled())
+ if (trace)
{
- log.debug("unregistered " + handler);
+ log.trace("unregistered " + handler);
}
-
+
if (listener != null)
{
listener.handlerUnregistered(handlerID);
}
}
-
+
public void setListener(final PacketHandlerRegistrationListener listener)
{
this.listener = listener;
@@ -115,20 +119,21 @@
{
return handlers.get(handlerID);
}
-
- public void dispatch(final Packet packet, final PacketReturner sender) throws Exception
+
+ public void dispatch(final Packet packet, final PacketReturner sender)
+ throws Exception
{
long targetID = packet.getTargetID();
if (NO_ID_SET == targetID)
{
- log.error("Packet is not handled, it has no targetID: " + packet + ": " + System.identityHashCode(packet));
+ log.error("Packet is not handled, it has no targetID: " + packet
+ + ": " + System.identityHashCode(packet));
return;
}
PacketHandler handler = getHandler(targetID);
if (handler != null)
{
- if (log.isTraceEnabled())
- log.trace(handler + " handles " + packet);
+ if (trace) log.trace(handler + " handles " + packet);
callFilters(packet);
handler.handle(packet, sender);
@@ -142,27 +147,29 @@
/** Call filters on a package */
public void callFilters(Packet packet) throws Exception
{
- if (filters != null)
- {
- for (Interceptor filter: filters)
- {
- filter.intercept(packet);
- }
- }
+ if (filters != null)
+ {
+ for (Interceptor filter : filters)
+ {
+ filter.intercept(packet);
+ }
+ }
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
- private void dump()
+ private void dump()
{
if (log.isDebugEnabled())
{
- StringBuffer buf = new StringBuffer("Registered PacketHandlers (" + this + "):\n");
- Iterator<Entry<Long, PacketHandler>> iterator = handlers.entrySet().iterator();
+ StringBuffer buf = new StringBuffer("Registered PacketHandlers ("
+ + this + "):\n");
+ Iterator<Entry<Long, PacketHandler>> iterator = handlers.entrySet()
+ .iterator();
while (iterator.hasNext())
{
Map.Entry<java.lang.Long, org.jboss.messaging.core.remoting.PacketHandler> entry = (Map.Entry<java.lang.Long, org.jboss.messaging.core.remoting.PacketHandler>) iterator
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -87,32 +87,5 @@
// Package protected ---------------------------------------------
-// static void addMDCFilter(final DefaultIoFilterChainBuilder filterChain)
-// {
-// assert filterChain != null;
-//
-// MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
-// filterChain.addLast("mdc", mdcInjectionFilter);
-// }
-//
-// static void addLoggingFilter(final DefaultIoFilterChainBuilder filterChain)
-// {
-// assert filterChain != null;
-//
-// LoggingFilter filter = new LoggingFilter();
-//
-// filter.setSessionCreatedLogLevel(TRACE);
-// filter.setSessionOpenedLogLevel(TRACE);
-// filter.setSessionIdleLogLevel(TRACE);
-// filter.setSessionClosedLogLevel(TRACE);
-//
-// filter.setMessageReceivedLogLevel(TRACE);
-// filter.setMessageSentLogLevel(TRACE);
-//
-// filter.setExceptionCaughtLogLevel(WARN);
-//
-// filterChain.addLast("logger", filter);
-// }
-
// Inner classes -------------------------------------------------
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionAttributeMap;
+import org.apache.mina.common.IoSessionDataStructureFactory;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
+import org.apache.mina.util.CircularQueue;
+
+/**
+ *
+ * A MessagingIOSessionDataStructureFactory
+ *
+ * @author The Apache MINA Project (dev at mina.apache.org)
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class MessagingIOSessionDataStructureFactory implements IoSessionDataStructureFactory
+{
+
+ public IoSessionAttributeMap getAttributeMap(IoSession session)
+ throws Exception
+ {
+ return new ConcurrentIoSessionAttributeMap();
+ }
+
+ public WriteRequestQueue getWriteRequestQueue(IoSession session)
+ throws Exception
+ {
+ return new DefaultWriteRequestQueue();
+ }
+
+
+ private static class ConcurrentIoSessionAttributeMap implements IoSessionAttributeMap {
+
+ private final ConcurrentMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
+
+ public Object getAttribute(IoSession session, Object key, Object defaultValue) {
+ if (key == null) {
+ throw new NullPointerException("key");
+ }
+
+ Object answer = attributes.get(key);
+ if (answer == null) {
+ return defaultValue;
+ } else {
+ return answer;
+ }
+ }
+
+ public Object setAttribute(IoSession session, Object key, Object value) {
+ if (key == null) {
+ throw new NullPointerException("key");
+ }
+
+ if (value == null) {
+ return attributes.remove(key);
+ } else {
+ return attributes.put(key, value);
+ }
+ }
+
+ public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
+ if (key == null) {
+ throw new NullPointerException("key");
+ }
+
+ if (value == null) {
+ return null;
+ }
+
+ return attributes.putIfAbsent(key, value);
+ }
+
+ public Object removeAttribute(IoSession session, Object key) {
+ if (key == null) {
+ throw new NullPointerException("key");
+ }
+
+ return attributes.remove(key);
+ }
+
+ public boolean removeAttribute(IoSession session, Object key, Object value) {
+ if (key == null) {
+ throw new NullPointerException("key");
+ }
+
+ if (value == null) {
+ return false;
+ }
+
+ return attributes.remove(key, value);
+ }
+
+ public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
+ return attributes.replace(key, oldValue, newValue);
+ }
+
+ public boolean containsAttribute(IoSession session, Object key) {
+ return attributes.containsKey(key);
+ }
+
+ public Set<Object> getAttributeKeys(IoSession session) {
+ return new HashSet<Object>(attributes.keySet());
+ }
+
+ public void dispose(IoSession session) throws Exception {
+ }
+ }
+
+
+ private static class DefaultWriteRequestQueue implements WriteRequestQueue
+ {
+ private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
+
+ public void dispose(IoSession session) {
+ }
+
+ public void clear(IoSession session) {
+ q.clear();
+ }
+
+ public synchronized boolean isEmpty(IoSession session) {
+ return q.isEmpty();
+ }
+
+ public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+ q.offer(writeRequest);
+ }
+
+ public synchronized WriteRequest poll(IoSession session) {
+ return q.poll();
+ }
+
+ @Override
+ public String toString() {
+ return q.toString();
+ }
+ }
+
+// private static class DefaultWriteRequestQueue implements WriteRequestQueue
+// {
+// private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
+//
+// public void dispose(IoSession session) {
+// }
+//
+// public void clear(IoSession session) {
+// q.clear();
+// }
+//
+// public synchronized boolean isEmpty(IoSession session) {
+// return q.isEmpty();
+// }
+//
+// public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+// q.offer(writeRequest);
+// }
+//
+// public synchronized WriteRequest poll(IoSession session) {
+// return q.poll();
+// }
+//
+// @Override
+// public String toString() {
+// return q.toString();
+// }
+// }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -49,8 +49,10 @@
{
// Constants -----------------------------------------------------
- private final Logger log = Logger.getLogger(MinaConnector.class);
+ private static final Logger log = Logger.getLogger(MinaConnector.class);
+ private static boolean trace = log.isTraceEnabled();
+
// Attributes ----------------------------------------------------
private Location location;
@@ -67,6 +69,8 @@
private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
private IoServiceListenerAdapter ioListener;
+
+ private MinaHandler handler;
// Static --------------------------------------------------------
@@ -103,6 +107,8 @@
this.connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
+
+ connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
// addMDCFilter(filterChain);
if (connectionParams.isSSLEnabled())
@@ -118,9 +124,8 @@
}
}
addCodecFilter(filterChain);
- // addLoggingFilter(filterChain);
- addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
- connectionParams.getKeepAliveTimeout(), this);
+// addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
+// connectionParams.getKeepAliveTimeout(), this);
connector.getSessionConfig().setTcpNoDelay(connectionParams.isTcpNoDelay());
int receiveBufferSize = connectionParams.getTcpReceiveBufferSize();
if (receiveBufferSize != -1)
@@ -142,27 +147,28 @@
{
if (session != null && session.isConnected())
{
- return new MinaSession(session);
+ return new MinaSession(session, handler);
}
threadPool = Executors.newCachedThreadPool();
- connector.setHandler(new MinaHandler(dispatcher, threadPool, this, false));
+ handler = new MinaHandler(dispatcher, threadPool, this, false);
+ connector.setHandler(handler);
InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
ConnectFuture future = connector.connect(address);
connector.setDefaultRemoteAddress(address);
ioListener = new IoServiceListenerAdapter();
connector.addListener(ioListener);
-
+
future.awaitUninterruptibly();
if (!future.isConnected())
{
throw new IOException("Cannot connect to " + address.toString());
}
this.session = future.getSession();
- Packet packet = new Ping(session.getId());
- session.write(packet);
+// Packet packet = new Ping(session.getId());
+// session.write(packet);
- return new MinaSession(session);
+ return new MinaSession(session, handler);
}
public boolean disconnect()
@@ -208,7 +214,7 @@
listeners.add(listener);
- if (log.isTraceEnabled())
+ if (trace)
log.trace("added listener " + listener + " to " + this);
}
@@ -219,7 +225,7 @@
listeners.remove(listener);
- if (log.isTraceEnabled())
+ if (trace)
log.trace("removed listener " + listener + " from " + this);
}
@@ -269,25 +275,25 @@
public void serviceActivated(IoService service)
{
- if (log.isTraceEnabled())
+ if (trace)
log.trace("activated " + service);
}
public void serviceDeactivated(IoService service)
{
- if (log.isTraceEnabled())
+ if (trace)
log.trace("deactivated " + service);
}
public void serviceIdle(IoService service, IdleStatus idleStatus)
{
- if (log.isTraceEnabled())
+ if (trace)
log.trace("idle " + service + ", status=" + idleStatus);
}
public void sessionCreated(IoSession session)
{
- if (log.isTraceEnabled())
+ if (trace)
log.trace("created session " + session);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -10,10 +10,10 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.reqres.Response;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
@@ -21,7 +21,6 @@
import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.util.OrderedExecutorFactory;
/**
@@ -31,12 +30,15 @@
* @version <tt>$Revision$</tt>
*
*/
-public class MinaHandler extends IoHandlerAdapter implements PacketHandlerRegistrationListener
+public class MinaHandler extends IoHandlerAdapter implements
+ PacketHandlerRegistrationListener
{
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(MinaHandler.class);
+ private static boolean trace = log.isTraceEnabled();
+
// Attributes ----------------------------------------------------
private final PacketDispatcher dispatcher;
@@ -47,22 +49,24 @@
private final OrderedExecutorFactory executorFactory;
- //Note! must use ConcurrentMap here to avoid race condition
+ // Note! must use ConcurrentMap here to avoid race condition
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public MinaHandler(final PacketDispatcher dispatcher, final ExecutorService executorService,
- final CleanUpNotifier failureNotifier, final boolean closeSessionOnExceptionCaught)
+ public MinaHandler(final PacketDispatcher dispatcher,
+ final ExecutorService executorService,
+ final CleanUpNotifier failureNotifier,
+ final boolean closeSessionOnExceptionCaught)
{
- assert dispatcher!= null;
+ assert dispatcher != null;
assert executorService != null;
-
+
this.dispatcher = dispatcher;
this.failureNotifier = failureNotifier;
this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
-
+
this.executorFactory = new OrderedExecutorFactory(executorService);
this.dispatcher.setListener(this);
}
@@ -75,7 +79,7 @@
{
// do nothing on registration
}
-
+
public void handlerUnregistered(final long handlerID)
{
executors.remove(handlerID);
@@ -88,12 +92,12 @@
throws Exception
{
log.error("caught exception " + cause + " for session " + session, cause);
-
+
if (failureNotifier != null)
{
long serverSessionID = session.getId();
- MessagingException me =
- new MessagingException(MessagingException.INTERNAL_ERROR, "unexpected exception");
+ MessagingException me = new MessagingException(
+ MessagingException.INTERNAL_ERROR, "unexpected exception");
me.initCause(cause);
failureNotifier.fireCleanup(serverSessionID, me);
}
@@ -102,43 +106,29 @@
session.close();
}
}
-
+
@Override
public void messageReceived(final IoSession session, final Object message)
- throws Exception
+ throws Exception
{
- if (message instanceof Ping)
- {
- if (log.isTraceEnabled())
- log.trace("received ping " + message);
- // response is handled by the keep-alive filter.
- // do nothing
- return;
- }
-
- if (!(message instanceof Packet))
- {
- throw new IllegalArgumentException("Unknown message type: " + message);
- }
-
final Packet packet = (Packet) message;
long executorID = packet.getExecutorID();
-
+
Executor executor = executors.get(executorID);
if (executor == null)
{
executor = executorFactory.getOrderedExecutor();
-
+
Executor oldExecutor = executors.putIfAbsent(executorID, executor);
-
+
if (oldExecutor != null)
{
- //Avoid race
- executor = oldExecutor;
+ //Avoid race
+ executor = oldExecutor;
}
}
-
- executor.execute(new Runnable()
+
+ executor.execute(new Runnable()
{
public void run()
{
@@ -151,9 +141,62 @@
}
}
});
-
+
}
+ private final int high = 2000;
+
+ private final int low = 500;
+
+ private AtomicInteger count = new AtomicInteger(0);
+
+ private volatile boolean blocked = true;
+
+ @Override
+ public void messageSent(final IoSession session, final Object message)
+ throws Exception
+ {
+ int newcount = count.decrementAndGet();
+
+ if (blocked)
+ {
+ if (newcount == low)
+ {
+ blocked = false;
+
+ // log.info("unblocking");
+
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ }
+
+ }
+
+ public void acquireSemaphore() throws Exception
+ {
+ // if (!sem.tryAcquire(5000, TimeUnit.MILLISECONDS))
+ // {
+ // throw new IllegalStateException("Timed out");
+ // }
+ int newcount = count.incrementAndGet();
+
+ if (newcount == high)
+ {
+ blocked = true;
+
+ // log.info("blocking");
+
+ synchronized (this)
+ {
+ this.wait(5000);
+ }
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -164,22 +207,22 @@
throws Exception
{
PacketReturner returner;
-
+
if (packet.getResponseTargetID() != EmptyPacket.NO_ID_SET)
- {
+ {
returner = new PacketReturner()
{
public void send(Packet p) throws Exception
{
dispatcher.callFilters(p);
- session.write(p);
+ session.write(p);
}
-
+
public long getSessionID()
{
return session.getId();
}
-
+
public String getRemoteAddress()
{
return session.getRemoteAddress().toString();
@@ -191,11 +234,10 @@
returner = null;
}
- if (log.isTraceEnabled())
- log.trace("received packet " + packet);
+ if (trace) log.trace("received packet " + packet);
dispatcher.dispatch(packet, returner);
}
-
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -10,7 +10,6 @@
import static org.jboss.messaging.core.remoting.TransportType.INVM;
import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
import java.net.InetSocketAddress;
@@ -128,6 +127,9 @@
&& acceptor == null)
{
acceptor = new NioSocketAcceptor();
+
+ acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
+
DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
// addMDCFilter(filterChain);
@@ -139,9 +141,6 @@
.getTrustStorePassword());
}
addCodecFilter(filterChain);
- // addLoggingFilter(filterChain);
- addKeepAliveFilter(filterChain, factory,
- config.getKeepAliveInterval(), config.getKeepAliveTimeout(), this);
// Bind
acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
@@ -258,7 +257,8 @@
// Inner classes -------------------------------------------------
- private final class MinaSessionListener implements IoServiceListener {
+ private final class MinaSessionListener implements IoServiceListener
+ {
public void serviceActivated(IoService service)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -7,6 +7,7 @@
package org.jboss.messaging.core.remoting.impl.mina;
import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.Packet;
@@ -20,22 +21,26 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(MinaConnector.class);
+
+
// Attributes ----------------------------------------------------
private final IoSession session;
- //private AtomicLong correlationCounter;
+ private MinaHandler handler;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public MinaSession(IoSession session)
+ public MinaSession(IoSession session, MinaHandler handler)
{
assert session != null;
this.session = session;
- // correlationCounter = new AtomicLong(0);
+
+ this.handler = handler;
}
// Public --------------------------------------------------------
@@ -46,7 +51,16 @@
}
public void write(Packet packet)
- {
+ {
+ try
+ {
+ handler.acquireSemaphore();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to acquire sem", e);
+ }
+
session.write(packet);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -100,7 +100,7 @@
// }
}
- session.send(message);
+ session.send(message);
}
public void sendCredits() throws Exception
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -55,6 +55,22 @@
// Constructors --------------------------------------------------
+ public static void main(String[] args)
+ {
+ try
+ {
+ MessageProducerTest test = new MessageProducerTest();
+
+ test.setUp();
+ test.testSpeed3();
+ test.tearDown();
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
+ }
+
public MessageProducerTest(String name)
{
super(name);
@@ -197,75 +213,82 @@
pconn.close();
}
}
-//
-// public void testSpeed2() throws Exception
-// {
-// Connection pconn = null;
-//
-// try
-// {
-// pconn = cf.createConnection();
-//
-// Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-// MessageProducer p = ps.createProducer(queue1);
-//
-// MessageConsumer cons = ps.createConsumer(queue1);
-//
-// pconn.start();
-//
-// p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//
-// p.setDisableMessageID(true);
-// p.setDisableMessageTimestamp(true);
-//
-// final int numMessages = 100000;
-//
-// long start = System.currentTimeMillis();
-//
-// BytesMessage msg = ps.createBytesMessage();
-//
-// msg.writeBytes(new byte[1000]);
-//
-// final CountDownLatch latch = new CountDownLatch(1);
-//
-// class MyListener implements MessageListener
-// {
-// int count;
-//
-// public void onMessage(Message msg)
-// {
-// count++;
-//
-// if (count == numMessages)
-// {
-// latch.countDown();
-// }
-// }
-// }
-//
-// cons.setMessageListener(new MyListener());
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// p.send(msg);
-// }
-//
-// latch.await();
-//
-// long end = System.currentTimeMillis();
-//
-// double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-// log.info("rate " + actualRate + " msgs /sec");
-//
-// }
-// finally
-// {
-// pconn.close();
-// }
-// }
-//
+
+ public void testSpeed2() throws Exception
+ {
+ Connection pconn = null;
+
+ try
+ {
+ pconn = cf.createConnection();
+
+ Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageProducer p = ps.createProducer(queue1);
+
+ MessageConsumer cons = ps.createConsumer(queue1);
+
+ pconn.start();
+
+ p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ p.setDisableMessageID(true);
+ p.setDisableMessageTimestamp(true);
+
+ final int numMessages = 10000;
+
+ long start = System.currentTimeMillis();
+
+ BytesMessage msg = ps.createBytesMessage();
+
+ msg.writeBytes(new byte[1000]);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements MessageListener
+ {
+ int count;
+
+ public void onMessage(Message msg)
+ {
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ cons.setMessageListener(new MyListener());
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(msg);
+ }
+
+ latch.await();
+
+ long end = System.currentTimeMillis();
+
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ log.info("rate " + actualRate + " msgs /sec");
+
+ }
+ finally
+ {
+ pconn.close();
+ }
+ }
+
+ public MessageProducerTest()
+ {
+ super("MessageProducerTest");
+ }
+
+
+
public void testSpeed3() throws Exception
{
Connection pconn = null;
@@ -310,24 +333,37 @@
cons.setMessageListener(new MyListener());
+ long start = System.currentTimeMillis();
+
+
for (int i = 0; i < numMessages; i++)
{
p.send(msg);
}
- long start = System.currentTimeMillis();
+
+ long end = System.currentTimeMillis();
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+ log.info("send rate " + actualRate + " msgs /sec");
+
+ log.info("Sleeping");
+
+ Thread.sleep(10000);
+
+ log.info("Let's go....");
+
pconn.start();
latch.await();
- long end = System.currentTimeMillis();
+ end = System.currentTimeMillis();
- double actualRate = 1000 * (double)numMessages / ( end - start);
+ actualRate = 1000 * (double)numMessages / ( end - start);
- log.info("rate " + actualRate + " msgs /sec");
+ log.info("consume rate " + actualRate + " msgs /sec");
}
finally
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -43,6 +43,7 @@
super.setUp();
conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
conf.setTransport(TransportType.TCP);
conf.setHost("localhost");
server = new MessagingServerImpl(conf);
@@ -56,35 +57,35 @@
super.tearDown();
}
+//
+//
+// public void testCoreClient() throws Exception
+// {
+// Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+//
+// ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+// ClientConnection conn = cf.createConnection();
+//
+// ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
+// session.createQueue(QUEUE, QUEUE, null, false, false);
+//
+// ClientProducer producer = session.createProducer(QUEUE);
+//
+// ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
+// System.currentTimeMillis(), (byte) 1);
+// message.getBody().putString("testINVMCoreClient");
+// producer.send(message);
+//
+// ClientConsumer consumer = session.createConsumer(QUEUE);
+// conn.start();
+//
+// message = consumer.receive(1000);
+//
+// assertEquals("testINVMCoreClient", message.getBody().getString());
+//
+// conn.close();
+// }
-
- public void testCoreClient() throws Exception
- {
- Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
-
- ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
- ClientConnection conn = cf.createConnection();
-
- ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().putString("testINVMCoreClient");
- producer.send(message);
-
- ClientConsumer consumer = session.createConsumer(QUEUE);
- conn.start();
-
- message = consumer.receive(1000);
-
- assertEquals("testINVMCoreClient", message.getBody().getString());
-
- conn.close();
- }
-
public static void main(String[] args)
{
try
@@ -107,6 +108,7 @@
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
cf.setDefaultConsumerWindowSize(-1);
+ // cf.setDefaultProducerMaxRate(30000);
ClientConnection conn = cf.createConnection();
@@ -118,9 +120,9 @@
ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte) 1);
- //byte[] bytes = new byte[1000];
+ byte[] bytes = new byte[1000];
- //message.getBody().putBytes(bytes);
+ message.getBody().putBytes(bytes);
message.getBody().flip();
@@ -128,9 +130,9 @@
ClientConsumer consumer = session.createConsumer(QUEUE);
final CountDownLatch latch = new CountDownLatch(1);
+//
+ final int numMessages = 50000;
- final int numMessages = 100000;
-
class MyHandler implements MessageHandler
{
int count;
@@ -156,15 +158,29 @@
}
consumer.setMessageHandler(new MyHandler());
+
+
//System.out.println("Waiting 10 secs");
// Thread.sleep(10000);
+
+
System.out.println("Starting");
- conn.start();
-
+
+ //Warmup
+ for (int i = 0; i < 50000; i++)
+ {
+ producer.send(message);
+ }
+//
+// System.out.println("Waiting 10 secs");
+//
+// Thread.sleep(10000);
+
+
long start = System.currentTimeMillis();
for (int i = 0; i < numMessages; i++)
@@ -172,6 +188,15 @@
producer.send(message);
}
+
+
+
+ //long end = System.currentTimeMillis();
+
+ //double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ //System.out.println("Send Rate is " + actualRate);
+
// long end = System.currentTimeMillis();
//
// double actualRate = 1000 * (double)numMessages / ( end - start);
@@ -182,7 +207,7 @@
// start = System.currentTimeMillis();
- latch.await();
+ // latch.await();
// long end = System.currentTimeMillis();
//
@@ -192,18 +217,39 @@
//conn.start();
-
- //start = System.currentTimeMillis();
-
+ //System.out.println("Waiting 10 secs");
+
long end = System.currentTimeMillis();
double actualRate = 1000 * (double)numMessages / ( end - start);
- System.out.println(" consume Rate is " + actualRate);
+ System.out.println("Rate is " + actualRate);
+ //Thread.sleep(10000);
+
+
+ // conn.start();
//
+//
+// start = System.currentTimeMillis();
+//
+
+// conn.start();
+////
+// start = System.currentTimeMillis();
+////
+// latch.await();
+////
+////
+// end = System.currentTimeMillis();
+//
+// actualRate = 1000 * (double)numMessages / ( end - start);
+//
+// System.out.println("Rate is " + actualRate);
+
+//
// message = consumer.receive(1000);
//
// assertEquals("testINVMCoreClient", message.getBody().getString());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -67,29 +67,6 @@
trustStorePassword = keystorePassword;
}
- public void testAddKeepAliveFilterWithIncorrectParameters() throws Exception
- {
- int keepAliveInterval = 5; // seconds
- int keepAliveTimeout = 10; // seconds
-
- DefaultIoFilterChainBuilder filterChain = new DefaultIoFilterChainBuilder();
- KeepAliveFactory factory = createMock(KeepAliveFactory.class);
- CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
-
- replay(factory, notifier);
-
- try
- {
- FilterChainSupport.addKeepAliveFilter(filterChain, factory,
- keepAliveInterval, keepAliveTimeout, notifier);
- fail("the interval must be greater than the timeout");
- } catch (IllegalArgumentException e)
- {
- }
-
- verify(factory, notifier);
- }
-
public void testSSLFilter() throws Exception
{
InetSocketAddress address = new InetSocketAddress("localhost", 9091);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-05-14 08:05:10 UTC (rev 4186)
@@ -43,17 +43,6 @@
// Public --------------------------------------------------------
- public void testReceiveNotAbstractPacket() throws Exception
- {
- try
- {
- handler.messageReceived(null, new Object());
- fail();
- } catch (IllegalArgumentException e)
- {
- }
- }
-
public void testReceiveUnhandledAbstractPacket() throws Exception
{
TextPacket packet = new TextPacket("testReceiveUnhandledAbstractPacket");
More information about the jboss-cvs-commits
mailing list