[jboss-cvs] JBoss Messaging SVN: r4186 - in trunk: docs/examples/messaging/src/org/jboss/messaging/example and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 14 04:05:11 EDT 2008


Author: timfox
Date: 2008-05-14 04:05:10 -0400 (Wed, 14 May 2008)
New Revision: 4186

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
Modified:
   trunk/.classpath
   trunk/build-messaging.xml
   trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java
   trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
   trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
Log:
Several tweaks - interim commit - disabled pinging etc


Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/.classpath	2008-05-14 08:05:10 UTC (rev 4186)
@@ -62,5 +62,5 @@
 	<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
 	<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
 	<classpathentry exported="true" kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080418.144850.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080418.144850-sources.jar"/>
-	<classpathentry kind="output" path="bin"/>
+	<classpathentry kind="output" path="eclipse-output"/>
 </classpath>

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/build-messaging.xml	2008-05-14 08:05:10 UTC (rev 4186)
@@ -630,6 +630,7 @@
    <target name="jms-tests" depends="jar, compile-jms-tests">
       <echo message=""/>
       <echo message="Running jms tests, fork=${junit.fork}, junit.batchtest.fork=${junit.batchtest.fork}"/>
+      <echo message="classpath is:${toString:unit.test.execution.classpath}"/>
       <echo message=""/>
       <mkdir dir="${test.output.dir}"/>
       <mkdir dir="${test.reports.dir}"/>

Modified: trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java
===================================================================
--- trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/docs/examples/messaging/src/org/jboss/messaging/example/SSLClient.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -74,7 +74,7 @@
                System.currentTimeMillis(), (byte) 1);
          message.getBody().putString("Hello!");
          clientProducer.send(message);
-         ClientConsumer clientConsumer = clientSession.createConsumer(queue, null, false, false);
+         ClientConsumer clientConsumer = clientSession.createConsumer(queue);
          clientConnection.start();
          Message msg = clientConsumer.receive(5000);
          System.out.println("msg.getPayload() = " + msg.getBody().getString());

Modified: trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
===================================================================
--- trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -62,7 +62,7 @@
                  System.currentTimeMillis(), (byte) 1);
          message.getBody().putString("Hello!");
          clientProducer.send(message);
-         ClientConsumer clientConsumer = clientSession.createConsumer(queue, null, false, false);
+         ClientConsumer clientConsumer = clientSession.createConsumer(queue);
          clientConnection.start();
          Message msg = clientConsumer.receive(5000);
          System.out.println("msg.getPayload() = " + msg.getBody().getString());

Modified: trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java
===================================================================
--- trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/docs/examples/messaging/src/org/jboss/messaging/example/SimpleExample.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -99,7 +99,7 @@
                  System.currentTimeMillis(), (byte) 1);
          message.getBody().putString("Hello!");
          clientProducer.send(message);
-         ClientConsumer clientConsumer = clientSession.createConsumer(atestq, null, false, false);
+         ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
          clientConnection.start();
          Message msg = clientConsumer.receive(5000);
          System.out.println("msg.getPayload() = " + msg.getBody().getString());

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -41,6 +41,8 @@
    
    ClientBrowser createBrowser(SimpleString queueName, SimpleString filterString) throws MessagingException;
    
+   ClientBrowser createBrowser(SimpleString queueName) throws MessagingException;
+   
    ClientProducer createProducer(SimpleString address) throws MessagingException;
    
    ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -166,12 +166,12 @@
    	   remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
    	}
    	 	   	
-//   	if (rateLimiter != null)
-//   	{
-//   	   // Rate flow control
-//      	   		
-//   		rateLimiter.limit();
-//   	}
+   	if (rateLimiter != null)
+   	{
+   	   // Rate flow control
+      	   		
+   		rateLimiter.limit();
+   	}
    }
             
    public void registerAcknowledgementHandler(final AcknowledgementHandler handler)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -293,6 +293,11 @@
       return consumer;
    }
    
+   public ClientBrowser createBrowser(final SimpleString queueName) throws MessagingException
+   {
+      return createBrowser(queueName, null);
+   }
+   
    public ClientBrowser createBrowser(final SimpleString queueName, final SimpleString filterString) throws MessagingException
    {
       checkClosed();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -23,6 +23,9 @@
 
 import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
 
+import java.util.Timer;
+import java.util.TimerTask;
+
 import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.client.RemotingSessionListener;
@@ -35,6 +38,7 @@
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 
 /**
  * 
@@ -97,7 +101,7 @@
 
       log.trace(this + " started");
    }
-
+   
    public void stop()
    {
       log.trace(this + " stop");

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -271,6 +271,11 @@
       this.invmDisabledModified = true;
    }
    
+   public void setSecurityEnabled(final boolean enabled)
+   {
+      this.securityEnabled = enabled;
+   }
+   
    public boolean isSSLEnabled()
    {
       if (System.getProperty(REMOTING_ENABLE_SSL_SYSPROP_KEY) != null && !sslEnabledModified)

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -35,7 +35,7 @@
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
-/**
+/**ConfigurationImpl
  * This class allows the Configuration class to be configured via a config file.
  *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -131,7 +131,7 @@
       this.journalTaskPeriod = getLong(e, "journal-task-period", 5000L);
       
       this.securityEnabled = getBoolean(e, "security-enabled", true);
-            
+       
       NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");
 
       ArrayList<String> interceptorList = new ArrayList<String>();

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -119,9 +119,8 @@
       buff.putLong(timestamp);
       buff.putByte(priority);
       properties.encode(buff);
-      buff.putInt(body.limit());
-      buff.putBytes(body.array(), 0, body.limit());
-   
+      buff.putInt(body.limit());     
+      buff.putBytes(body.array(), 0, body.limit());   
    }
    
    public int encodeSize()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -36,14 +36,17 @@
 
    private static final long serialVersionUID = -4626926952268528384L;
 
-   public static final Logger log = Logger.getLogger(PacketDispatcherImpl.class);
+   public static final Logger log = Logger
+         .getLogger(PacketDispatcherImpl.class);
 
+   private static boolean trace = log.isTraceEnabled();
+
    // Attributes ----------------------------------------------------
 
    private final Map<Long, PacketHandler> handlers;
    public final List<Interceptor> filters;
    private transient PacketHandlerRegistrationListener listener;
-   
+
    private final AtomicLong idSequence = new AtomicLong(0);
 
    // Static --------------------------------------------------------
@@ -52,7 +55,7 @@
 
    public PacketDispatcherImpl(final List<Interceptor> filters)
    {
-   	handlers = new ConcurrentHashMap<Long, PacketHandler>();
+      handlers = new ConcurrentHashMap<Long, PacketHandler>();
       this.filters = filters;
    }
 
@@ -61,25 +64,26 @@
    public long generateID()
    {
       long id = idSequence.getAndIncrement();
-      
+
       if (id == 0)
       {
-         //ID 0 is reserved for the connection factory handler
+         // ID 0 is reserved for the connection factory handler
          id = generateID();
       }
-      
+
       return id;
    }
-   
+
    public void register(final PacketHandler handler)
-   { 
+   {
       handlers.put(handler.getID(), handler);
 
-      if (log.isDebugEnabled())
+      if (trace)
       {
-         log.debug("registered " + handler + " with ID " + handler.getID() + " (" + this + ")");
+         log.trace("registered " + handler + " with ID " + handler.getID()
+               + " (" + this + ")");
       }
-      
+
       if (listener != null)
       {
          listener.handlerRegistered(handler.getID());
@@ -89,23 +93,23 @@
    public void unregister(final long handlerID)
    {
       PacketHandler handler = handlers.remove(handlerID);
-      
+
       if (handler == null)
       {
          log.warn("no handler defined for " + handlerID);
-         dump();      
+         dump();
       }
-      if (log.isDebugEnabled())
+      if (trace)
       {
-         log.debug("unregistered " + handler);
+         log.trace("unregistered " + handler);
       }
-      
+
       if (listener != null)
       {
          listener.handlerUnregistered(handlerID);
       }
    }
-   
+
    public void setListener(final PacketHandlerRegistrationListener listener)
    {
       this.listener = listener;
@@ -115,20 +119,21 @@
    {
       return handlers.get(handlerID);
    }
-   
-   public void dispatch(final Packet packet, final PacketReturner sender) throws Exception
+
+   public void dispatch(final Packet packet, final PacketReturner sender)
+         throws Exception
    {
       long targetID = packet.getTargetID();
       if (NO_ID_SET == targetID)
       {
-         log.error("Packet is not handled, it has no targetID: " + packet + ": " + System.identityHashCode(packet));
+         log.error("Packet is not handled, it has no targetID: " + packet
+               + ": " + System.identityHashCode(packet));
          return;
       }
       PacketHandler handler = getHandler(targetID);
       if (handler != null)
       {
-         if (log.isTraceEnabled())
-            log.trace(handler + " handles " + packet);
+         if (trace) log.trace(handler + " handles " + packet);
 
          callFilters(packet);
          handler.handle(packet, sender);
@@ -142,27 +147,29 @@
    /** Call filters on a package */
    public void callFilters(Packet packet) throws Exception
    {
-     if (filters != null)
-     {
-        for (Interceptor filter: filters)
-        {
-           filter.intercept(packet);          
-        }
-     }
+      if (filters != null)
+      {
+         for (Interceptor filter : filters)
+         {
+            filter.intercept(packet);
+         }
+      }
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
-   private void dump() 
+   private void dump()
    {
       if (log.isDebugEnabled())
       {
-         StringBuffer buf = new StringBuffer("Registered PacketHandlers (" + this + "):\n");
-         Iterator<Entry<Long, PacketHandler>> iterator = handlers.entrySet().iterator();
+         StringBuffer buf = new StringBuffer("Registered PacketHandlers ("
+               + this + "):\n");
+         Iterator<Entry<Long, PacketHandler>> iterator = handlers.entrySet()
+               .iterator();
          while (iterator.hasNext())
          {
             Map.Entry<java.lang.Long, org.jboss.messaging.core.remoting.PacketHandler> entry = (Map.Entry<java.lang.Long, org.jboss.messaging.core.remoting.PacketHandler>) iterator

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -87,32 +87,5 @@
    
    // Package protected ---------------------------------------------
 
-//   static void addMDCFilter(final DefaultIoFilterChainBuilder filterChain)
-//   {
-//      assert filterChain != null;
-//
-//      MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
-//      filterChain.addLast("mdc", mdcInjectionFilter);
-//   }
-//
-//   static void addLoggingFilter(final DefaultIoFilterChainBuilder filterChain)
-//   {
-//      assert filterChain != null;
-//
-//      LoggingFilter filter = new LoggingFilter();
-//
-//      filter.setSessionCreatedLogLevel(TRACE);
-//      filter.setSessionOpenedLogLevel(TRACE);
-//      filter.setSessionIdleLogLevel(TRACE);
-//      filter.setSessionClosedLogLevel(TRACE);
-//
-//      filter.setMessageReceivedLogLevel(TRACE);
-//      filter.setMessageSentLogLevel(TRACE);
-//
-//      filter.setExceptionCaughtLogLevel(WARN);
-//
-//      filterChain.addLast("logger", filter);
-//   }
-
    // Inner classes -------------------------------------------------
 }

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -0,0 +1,195 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionAttributeMap;
+import org.apache.mina.common.IoSessionDataStructureFactory;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
+import org.apache.mina.util.CircularQueue;
+
+/**
+ * 
+ * A MessagingIOSessionDataStructureFactory
+ * 
+ * @author The Apache MINA Project (dev at mina.apache.org)
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class MessagingIOSessionDataStructureFactory implements IoSessionDataStructureFactory
+{
+
+   public IoSessionAttributeMap getAttributeMap(IoSession session)
+         throws Exception
+   {
+      return new ConcurrentIoSessionAttributeMap();
+   }
+
+   public WriteRequestQueue getWriteRequestQueue(IoSession session)
+         throws Exception
+   {
+      return new DefaultWriteRequestQueue();
+   }
+   
+   
+   private static class ConcurrentIoSessionAttributeMap implements IoSessionAttributeMap {
+
+      private final ConcurrentMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
+
+      public Object getAttribute(IoSession session, Object key, Object defaultValue) {
+          if (key == null) {
+              throw new NullPointerException("key");
+          }
+
+          Object answer = attributes.get(key);
+          if (answer == null) {
+              return defaultValue;
+          } else {
+              return answer;
+          }
+      }
+
+      public Object setAttribute(IoSession session, Object key, Object value) {
+          if (key == null) {
+              throw new NullPointerException("key");
+          }
+
+          if (value == null) {
+              return attributes.remove(key);
+          } else {
+              return attributes.put(key, value);
+          }
+      }
+
+      public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
+          if (key == null) {
+              throw new NullPointerException("key");
+          }
+
+          if (value == null) {
+              return null;
+          }
+          
+          return attributes.putIfAbsent(key, value);
+      }
+
+      public Object removeAttribute(IoSession session, Object key) {
+          if (key == null) {
+              throw new NullPointerException("key");
+          }
+
+          return attributes.remove(key);
+      }
+
+      public boolean removeAttribute(IoSession session, Object key, Object value) {
+          if (key == null) {
+              throw new NullPointerException("key");
+          }
+
+          if (value == null) {
+              return false;
+          }
+          
+          return attributes.remove(key, value);
+      }
+
+      public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
+         return attributes.replace(key, oldValue, newValue);
+      }
+
+      public boolean containsAttribute(IoSession session, Object key) {
+          return attributes.containsKey(key);
+      }
+
+      public Set<Object> getAttributeKeys(IoSession session) {
+          return new HashSet<Object>(attributes.keySet());          
+      }
+
+      public void dispose(IoSession session) throws Exception {
+      }
+  }
+   
+   
+   private static class DefaultWriteRequestQueue implements WriteRequestQueue
+   {
+      private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
+      
+      public void dispose(IoSession session) {
+      }
+      
+      public void clear(IoSession session) {
+          q.clear();
+      }
+
+      public synchronized boolean isEmpty(IoSession session) {
+          return q.isEmpty();
+      }
+
+      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+          q.offer(writeRequest);
+      }
+
+      public synchronized WriteRequest poll(IoSession session) {
+          return q.poll();
+      }
+      
+      @Override
+      public String toString() {
+          return q.toString();
+      }
+  }
+   
+//   private static class DefaultWriteRequestQueue implements WriteRequestQueue
+//   {
+//      private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
+//      
+//      public void dispose(IoSession session) {
+//      }
+//      
+//      public void clear(IoSession session) {
+//          q.clear();
+//      }
+//
+//      public synchronized boolean isEmpty(IoSession session) {
+//          return q.isEmpty();
+//      }
+//
+//      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+//          q.offer(writeRequest);
+//      }
+//
+//      public synchronized WriteRequest poll(IoSession session) {
+//          return q.poll();
+//      }
+//      
+//      @Override
+//      public String toString() {
+//          return q.toString();
+//      }
+//  }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -49,8 +49,10 @@
 {
    // Constants -----------------------------------------------------
 
-   private final Logger log = Logger.getLogger(MinaConnector.class);
+   private static final Logger log = Logger.getLogger(MinaConnector.class);
    
+   private static boolean trace = log.isTraceEnabled();
+   
    // Attributes ----------------------------------------------------
 
    private Location location;
@@ -67,6 +69,8 @@
 
    private List<RemotingSessionListener> listeners = new ArrayList<RemotingSessionListener>();
    private IoServiceListenerAdapter ioListener;
+   
+   private MinaHandler handler;
 
    // Static --------------------------------------------------------
 
@@ -103,6 +107,8 @@
 
       this.connector = new NioSocketConnector();
       DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
+      
+      connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
 
       // addMDCFilter(filterChain);
       if (connectionParams.isSSLEnabled())
@@ -118,9 +124,8 @@
          }
       }
       addCodecFilter(filterChain);
-      // addLoggingFilter(filterChain);
-      addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
-            connectionParams.getKeepAliveTimeout(), this);
+//      addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
+//            connectionParams.getKeepAliveTimeout(), this);
       connector.getSessionConfig().setTcpNoDelay(connectionParams.isTcpNoDelay());
       int receiveBufferSize = connectionParams.getTcpReceiveBufferSize();
       if (receiveBufferSize != -1)
@@ -142,27 +147,28 @@
    {
       if (session != null && session.isConnected())
       {
-         return new MinaSession(session);
+         return new MinaSession(session, handler);
       }
       
       threadPool = Executors.newCachedThreadPool();
-      connector.setHandler(new MinaHandler(dispatcher, threadPool, this, false));
+      handler = new MinaHandler(dispatcher, threadPool, this, false);
+      connector.setHandler(handler);
       InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
       ConnectFuture future = connector.connect(address);
       connector.setDefaultRemoteAddress(address);
       ioListener = new IoServiceListenerAdapter();
       connector.addListener(ioListener);
-
+      
       future.awaitUninterruptibly();
       if (!future.isConnected())
       {
          throw new IOException("Cannot connect to " + address.toString());
       }
       this.session = future.getSession();
-      Packet packet = new Ping(session.getId());
-      session.write(packet);
+//      Packet packet = new Ping(session.getId());
+//      session.write(packet);
       
-      return new MinaSession(session);
+      return new MinaSession(session, handler);
    }
 
    public boolean disconnect()
@@ -208,7 +214,7 @@
 
       listeners.add(listener);
 
-      if (log.isTraceEnabled())
+      if (trace)
          log.trace("added listener " + listener + " to " + this);
    }
 
@@ -219,7 +225,7 @@
 
       listeners.remove(listener);
 
-      if (log.isTraceEnabled())
+      if (trace)
          log.trace("removed listener " + listener + " from " + this);
    }
 
@@ -269,25 +275,25 @@
 
       public void serviceActivated(IoService service)
       {
-         if (log.isTraceEnabled())
+         if (trace)
             log.trace("activated " + service);
       }
 
       public void serviceDeactivated(IoService service)
       {
-         if (log.isTraceEnabled())
+         if (trace)
             log.trace("deactivated " + service);
       }
 
       public void serviceIdle(IoService service, IdleStatus idleStatus)
       {
-         if (log.isTraceEnabled())
+         if (trace)
             log.trace("idle " + service + ", status=" + idleStatus);
       }
 
       public void sessionCreated(IoSession session)
       {
-         if (log.isTraceEnabled())
+         if (trace)
             log.trace("created session " + session);
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -10,10 +10,10 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
@@ -21,7 +21,6 @@
 import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.util.OrderedExecutorFactory;
 
 /**
@@ -31,12 +30,15 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class MinaHandler extends IoHandlerAdapter implements PacketHandlerRegistrationListener
+public class MinaHandler extends IoHandlerAdapter implements
+      PacketHandlerRegistrationListener
 {
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MinaHandler.class);
 
+   private static boolean trace = log.isTraceEnabled();
+
    // Attributes ----------------------------------------------------
 
    private final PacketDispatcher dispatcher;
@@ -47,22 +49,24 @@
 
    private final OrderedExecutorFactory executorFactory;
 
-   //Note! must use ConcurrentMap here to avoid race condition
+   // Note! must use ConcurrentMap here to avoid race condition
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-   
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-   public MinaHandler(final PacketDispatcher dispatcher, final ExecutorService executorService,
-   		             final CleanUpNotifier failureNotifier, final boolean closeSessionOnExceptionCaught)
+   public MinaHandler(final PacketDispatcher dispatcher,
+         final ExecutorService executorService,
+         final CleanUpNotifier failureNotifier,
+         final boolean closeSessionOnExceptionCaught)
    {
-      assert dispatcher!= null;
+      assert dispatcher != null;
       assert executorService != null;
-      
+
       this.dispatcher = dispatcher;
       this.failureNotifier = failureNotifier;
       this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
-      
+
       this.executorFactory = new OrderedExecutorFactory(executorService);
       this.dispatcher.setListener(this);
    }
@@ -75,7 +79,7 @@
    {
       // do nothing on registration
    }
-   
+
    public void handlerUnregistered(final long handlerID)
    {
       executors.remove(handlerID);
@@ -88,12 +92,12 @@
          throws Exception
    {
       log.error("caught exception " + cause + " for session " + session, cause);
-      
+
       if (failureNotifier != null)
       {
          long serverSessionID = session.getId();
-         MessagingException me =
-         	new MessagingException(MessagingException.INTERNAL_ERROR, "unexpected exception");
+         MessagingException me = new MessagingException(
+               MessagingException.INTERNAL_ERROR, "unexpected exception");
          me.initCause(cause);
          failureNotifier.fireCleanup(serverSessionID, me);
       }
@@ -102,43 +106,29 @@
          session.close();
       }
    }
-   
+
    @Override
    public void messageReceived(final IoSession session, final Object message)
-         throws Exception
+   throws Exception
    {
-      if (message instanceof Ping)
-      {
-         if (log.isTraceEnabled())
-            log.trace("received ping " + message);
-         // response is handled by the keep-alive filter.
-         // do nothing
-         return;
-      } 
-      
-      if (!(message instanceof Packet))
-      {
-         throw new IllegalArgumentException("Unknown message type: " + message);
-      }
-      
       final Packet packet = (Packet) message;
       long executorID = packet.getExecutorID();
-      
+
       Executor executor = executors.get(executorID);
       if (executor == null)
       {
          executor = executorFactory.getOrderedExecutor();
-         
+
          Executor oldExecutor = executors.putIfAbsent(executorID, executor);
-         
+
          if (oldExecutor != null)
          {
-         	//Avoid race
-         	executor = oldExecutor;
+            //Avoid race
+            executor = oldExecutor;
          }
       }
-      
-      executor.execute(new Runnable() 
+
+      executor.execute(new Runnable()
       {
          public void run()
          {
@@ -151,9 +141,62 @@
             }
          }
       });
-      
+
    }
 
+   private final int high = 2000;
+
+   private final int low = 500;
+
+   private AtomicInteger count = new AtomicInteger(0);
+
+   private volatile boolean blocked = true;
+
+   @Override
+   public void messageSent(final IoSession session, final Object message)
+         throws Exception
+   {
+      int newcount = count.decrementAndGet();
+
+      if (blocked)
+      {
+         if (newcount == low)
+         {
+            blocked = false;
+
+            // log.info("unblocking");
+
+            synchronized (this)
+            {
+               this.notify();
+            }
+         }
+      }
+
+   }
+
+   public void acquireSemaphore() throws Exception
+   {
+      // if (!sem.tryAcquire(5000, TimeUnit.MILLISECONDS))
+      // {
+      // throw new IllegalStateException("Timed out");
+      // }
+      int newcount = count.incrementAndGet();
+
+      if (newcount == high)
+      {
+         blocked = true;
+
+         // log.info("blocking");
+
+         synchronized (this)
+         {
+            this.wait(5000);
+         }
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -164,22 +207,22 @@
          throws Exception
    {
       PacketReturner returner;
-      
+
       if (packet.getResponseTargetID() != EmptyPacket.NO_ID_SET)
-      {      
+      {
          returner = new PacketReturner()
          {
             public void send(Packet p) throws Exception
             {
                dispatcher.callFilters(p);
-               session.write(p);            
+               session.write(p);
             }
-            
+
             public long getSessionID()
             {
                return session.getId();
             }
-            
+
             public String getRemoteAddress()
             {
                return session.getRemoteAddress().toString();
@@ -191,11 +234,10 @@
          returner = null;
       }
 
-      if (log.isTraceEnabled())
-         log.trace("received packet " + packet);
+      if (trace) log.trace("received packet " + packet);
 
       dispatcher.dispatch(packet, returner);
    }
-   
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -10,7 +10,6 @@
 import static org.jboss.messaging.core.remoting.TransportType.INVM;
 import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
 
 import java.net.InetSocketAddress;
@@ -128,6 +127,9 @@
             && acceptor == null)
       {
          acceptor = new NioSocketAcceptor();
+         
+         acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
+         
          DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
 
          // addMDCFilter(filterChain);
@@ -139,9 +141,6 @@
                         .getTrustStorePassword());
          }
          addCodecFilter(filterChain);
-         // addLoggingFilter(filterChain);
-         addKeepAliveFilter(filterChain, factory,
-               config.getKeepAliveInterval(), config.getKeepAliveTimeout(), this);
 
          // Bind
          acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
@@ -258,7 +257,8 @@
 
    // Inner classes -------------------------------------------------
 
-   private final class MinaSessionListener implements IoServiceListener {
+   private final class MinaSessionListener implements IoServiceListener
+   {
 
       public void serviceActivated(IoService service)
       {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -7,6 +7,7 @@
 package org.jboss.messaging.core.remoting.impl.mina;
 
 import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.Packet;
 
@@ -20,22 +21,26 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(MinaConnector.class);
+   
+   
    // Attributes ----------------------------------------------------
 
    private final IoSession session;
 
-   //private AtomicLong correlationCounter;
+   private MinaHandler handler;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public MinaSession(IoSession session)
+   public MinaSession(IoSession session, MinaHandler handler)
    {
       assert session != null;
 
       this.session = session;
-     // correlationCounter = new AtomicLong(0);
+  
+      this.handler = handler;
    }
 
    // Public --------------------------------------------------------
@@ -46,7 +51,16 @@
    }
 
    public void write(Packet packet)
-   {
+   {      
+      try
+      {
+         handler.acquireSemaphore();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to acquire sem", e);
+      }
+      
       session.write(packet);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -100,7 +100,7 @@
 //			}
 		}
 		
-		session.send(message);      
+		session.send(message);  		
 	}
 
 	public void sendCredits() throws Exception

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -55,6 +55,22 @@
 
    // Constructors --------------------------------------------------
 
+   public static void main(String[] args)
+   {
+      try
+      {
+         MessageProducerTest test = new MessageProducerTest();
+         
+         test.setUp();
+         test.testSpeed3();
+         test.tearDown();
+      }
+      catch (Throwable t)
+      {
+         t.printStackTrace();
+      }
+   }
+   
    public MessageProducerTest(String name)
    {
       super(name);
@@ -197,75 +213,82 @@
          pconn.close();
       }
    }
-//   
-//   public void testSpeed2() throws Exception
-//   {
-//      Connection pconn = null;      
-//
-//      try
-//      {
-//         pconn = cf.createConnection();
-//
-//         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-//         MessageProducer p = ps.createProducer(queue1);
-//         
-//         MessageConsumer cons = ps.createConsumer(queue1);
-//         
-//         pconn.start();
-//         
-//         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//         
-//         p.setDisableMessageID(true);
-//         p.setDisableMessageTimestamp(true);
-//
-//         final int numMessages = 100000;
-//
-//         long start = System.currentTimeMillis();
-//
-//         BytesMessage msg = ps.createBytesMessage();
-//         
-//         msg.writeBytes(new byte[1000]);
-//         
-//         final CountDownLatch latch = new CountDownLatch(1);
-//         
-//         class MyListener implements MessageListener
-//         {
-//            int count;
-//
-//            public void onMessage(Message msg)
-//            {
-//               count++;
-//               
-//               if (count == numMessages)
-//               {
-//                  latch.countDown();
-//               }
-//            }            
-//         }
-//         
-//         cons.setMessageListener(new MyListener());
-//         
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            p.send(msg);
-//         }
-//         
-//         latch.await();
-//         
-//         long end = System.currentTimeMillis();
-//
-//         double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-//         log.info("rate " + actualRate + " msgs /sec");
-//
-//      }
-//      finally
-//      {
-//         pconn.close();
-//      }
-//   }
-//   
+   
+   public void testSpeed2() throws Exception
+   {
+      Connection pconn = null;      
+
+      try
+      {
+         pconn = cf.createConnection();
+
+         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+         MessageProducer p = ps.createProducer(queue1);
+         
+         MessageConsumer cons = ps.createConsumer(queue1);
+         
+         pconn.start();
+         
+         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         
+         p.setDisableMessageID(true);
+         p.setDisableMessageTimestamp(true);
+
+         final int numMessages = 10000;
+
+         long start = System.currentTimeMillis();
+
+         BytesMessage msg = ps.createBytesMessage();
+         
+         msg.writeBytes(new byte[1000]);
+         
+         final CountDownLatch latch = new CountDownLatch(1);
+         
+         class MyListener implements MessageListener
+         {
+            int count;
+
+            public void onMessage(Message msg)
+            {
+               count++;
+               
+               if (count == numMessages)
+               {
+                  latch.countDown();
+               }
+            }            
+         }
+         
+         cons.setMessageListener(new MyListener());
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(msg);
+         }
+         
+         latch.await();
+         
+         long end = System.currentTimeMillis();
+
+         double actualRate = 1000 * (double)numMessages / ( end - start);
+
+         log.info("rate " + actualRate + " msgs /sec");
+
+      }
+      finally
+      {
+         pconn.close();
+      }
+   }
+   
+   public MessageProducerTest()
+   {
+      super("MessageProducerTest");
+   }
+   
+
+   
    public void testSpeed3() throws Exception
    {
       Connection pconn = null;      
@@ -310,24 +333,37 @@
          
          cons.setMessageListener(new MyListener());
          
+         long start = System.currentTimeMillis();
+         
+         
          for (int i = 0; i < numMessages; i++)
          {
             p.send(msg);
          }
          
-         long start = System.currentTimeMillis();
+         
+         long end = System.currentTimeMillis();
 
+         double actualRate = 1000 * (double)numMessages / ( end - start);
          
+         log.info("send rate " + actualRate + " msgs /sec");
+
+         log.info("Sleeping");
+         
+         Thread.sleep(10000);
+         
+         log.info("Let's go....");
+         
          pconn.start();
          
          
          latch.await();
          
-         long end = System.currentTimeMillis();
+         end = System.currentTimeMillis();
 
-         double actualRate = 1000 * (double)numMessages / ( end - start);
+         actualRate = 1000 * (double)numMessages / ( end - start);
 
-         log.info("rate " + actualRate + " msgs /sec");
+         log.info("consume rate " + actualRate + " msgs /sec");
 
       }
       finally

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -43,6 +43,7 @@
       super.setUp();
 
       conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
       conf.setTransport(TransportType.TCP);
       conf.setHost("localhost");      
       server = new MessagingServerImpl(conf);
@@ -56,35 +57,35 @@
       
       super.tearDown();
    }
+//   
+//   
+//   public void testCoreClient() throws Exception
+//   {
+//      Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+//            
+//      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+//      ClientConnection conn = cf.createConnection();
+//      
+//      ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
+//      session.createQueue(QUEUE, QUEUE, null, false, false);
+//      
+//      ClientProducer producer = session.createProducer(QUEUE);
+//
+//      ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
+//            System.currentTimeMillis(), (byte) 1);
+//      message.getBody().putString("testINVMCoreClient");
+//      producer.send(message);
+//
+//      ClientConsumer consumer = session.createConsumer(QUEUE);
+//      conn.start();
+//      
+//      message = consumer.receive(1000);
+//      
+//      assertEquals("testINVMCoreClient", message.getBody().getString());
+//      
+//      conn.close();
+//   }
    
-   
-   public void testCoreClient() throws Exception
-   {
-      Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
-            
-      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-      ClientConnection conn = cf.createConnection();
-      
-      ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
-      session.createQueue(QUEUE, QUEUE, null, false, false);
-      
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
-            System.currentTimeMillis(), (byte) 1);
-      message.getBody().putString("testINVMCoreClient");
-      producer.send(message);
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-      conn.start();
-      
-      message = consumer.receive(1000);
-      
-      assertEquals("testINVMCoreClient", message.getBody().getString());
-      
-      conn.close();
-   }
-   
    public static void main(String[] args)
    {
       try
@@ -107,6 +108,7 @@
             
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
       cf.setDefaultConsumerWindowSize(-1);
+   //   cf.setDefaultProducerMaxRate(30000);
       
       ClientConnection conn = cf.createConnection();
       
@@ -118,9 +120,9 @@
       ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
             System.currentTimeMillis(), (byte) 1);
       
-      //byte[] bytes = new byte[1000];
+      byte[] bytes = new byte[1000];
       
-      //message.getBody().putBytes(bytes);
+      message.getBody().putBytes(bytes);
       
       message.getBody().flip();
       
@@ -128,9 +130,9 @@
       ClientConsumer consumer = session.createConsumer(QUEUE);
             
       final CountDownLatch latch = new CountDownLatch(1);
+//      
+      final int numMessages = 50000;
       
-      final int numMessages = 100000;
-      
       class MyHandler implements MessageHandler
       {
          int count;
@@ -156,15 +158,29 @@
       }
 
       consumer.setMessageHandler(new MyHandler());
+      
+      
             
       //System.out.println("Waiting 10 secs");
       
      // Thread.sleep(10000);
       
+      
+      
       System.out.println("Starting");
       
-      conn.start();
-                  
+      
+      //Warmup
+      for (int i = 0; i < 50000; i++)
+      {      
+         producer.send(message);
+      }
+//      
+//      System.out.println("Waiting 10 secs");
+//      
+//      Thread.sleep(10000);
+      
+      
       long start = System.currentTimeMillis();
             
       for (int i = 0; i < numMessages; i++)
@@ -172,6 +188,15 @@
          producer.send(message);
       }
       
+      
+      
+            
+      //long end = System.currentTimeMillis();
+      
+      //double actualRate = 1000 * (double)numMessages / ( end - start);
+      
+      //System.out.println("Send Rate is " + actualRate);
+      
 //      long end = System.currentTimeMillis();
 //      
 //      double actualRate = 1000 * (double)numMessages / ( end - start);
@@ -182,7 +207,7 @@
       
      // start = System.currentTimeMillis();
       
-      latch.await();
+ //     latch.await();
       
 //      long end = System.currentTimeMillis();
 //
@@ -192,18 +217,39 @@
 
       //conn.start();
       
-
-      //start = System.currentTimeMillis();
-
+      //System.out.println("Waiting 10 secs");
       
       
+      
       long end = System.currentTimeMillis();
       
       double actualRate = 1000 * (double)numMessages / ( end - start);
       
-      System.out.println(" consume Rate is " + actualRate);
+      System.out.println("Rate is " + actualRate);
       
+      //Thread.sleep(10000);
+      
+      
+      //      conn.start();
 //      
+//      
+//      start = System.currentTimeMillis();
+//
+      
+//      conn.start();
+////      
+//      start = System.currentTimeMillis();
+////      
+//      latch.await();
+////            
+////      
+//      end = System.currentTimeMillis();
+//      
+//      actualRate = 1000 * (double)numMessages / ( end - start);
+//      
+//      System.out.println("Rate is " + actualRate);
+      
+//      
 //      message = consumer.receive(1000);
 //      
 //      assertEquals("testINVMCoreClient", message.getBody().getString());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/FilterChainSupportTest.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -67,29 +67,6 @@
       trustStorePassword = keystorePassword;
    }
 
-   public void testAddKeepAliveFilterWithIncorrectParameters() throws Exception
-   {
-      int keepAliveInterval = 5; // seconds
-      int keepAliveTimeout = 10; // seconds
-
-      DefaultIoFilterChainBuilder filterChain = new DefaultIoFilterChainBuilder();
-      KeepAliveFactory factory = createMock(KeepAliveFactory.class);
-      CleanUpNotifier notifier = createMock(CleanUpNotifier.class);
-
-      replay(factory, notifier);
-
-      try
-      {
-         FilterChainSupport.addKeepAliveFilter(filterChain, factory,
-               keepAliveInterval, keepAliveTimeout, notifier);
-         fail("the interval must be greater than the timeout");
-      } catch (IllegalArgumentException e)
-      {
-      }
-
-      verify(factory, notifier);
-   }
-
    public void testSSLFilter() throws Exception
    {
       InetSocketAddress address = new InetSocketAddress("localhost", 9091);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-05-13 21:47:07 UTC (rev 4185)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-05-14 08:05:10 UTC (rev 4186)
@@ -43,17 +43,6 @@
 
    // Public --------------------------------------------------------
 
-   public void testReceiveNotAbstractPacket() throws Exception
-   {
-      try
-      {
-         handler.messageReceived(null, new Object());
-         fail();
-      } catch (IllegalArgumentException e)
-      {
-      }
-   }
-
    public void testReceiveUnhandledAbstractPacket() throws Exception
    {
       TextPacket packet = new TextPacket("testReceiveUnhandledAbstractPacket");




More information about the jboss-cvs-commits mailing list