[infinispan-commits] Infinispan SVN: r1945 - in branches/4.1.x/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/transport/tcp and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jul 1 06:02:10 EDT 2010


Author: mircea.markus
Date: 2010-07-01 06:02:10 -0400 (Thu, 01 Jul 2010)
New Revision: 1945

Added:
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
Modified:
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
   branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties
Log:
[ISPN-473]-Hot Rod client max concurrent connections is too small


Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -68,14 +68,14 @@
  *  <li>maxActive - controls the maximum number of connections per server that are allocated (checked out to client threads, or idle in
  * the pool) at one time. When non-positive, there is no limit to the number of connections per server. When maxActive
  * is reached, the connection pool for that server is said to be exhausted. The default setting for this parameter is
- * 2.</li>
+ * -1, i.e. there is no limit.</li>
  * <li>maxTotal - sets a global limit on the number persistent connections that can be in circulation within the combined set of
  * servers. When non-positive, there is no limit to the total number of persistent connections in circulation. When
  * maxTotal is exceeded, all connections pools are exhausted. The default setting for this parameter is -1 (no limit).
  * </li>
  *
  * <li>maxIdle - controls the maximum number of idle persistent connections, per server, at any time. When negative, there is no limit
- * to the number of connections that may be idle per server. The default setting for this parameter is 2.</li>
+ * to the number of connections that may be idle per server. The default setting for this parameter is -1.</li>
  *
  * <li>
  *   whenExhaustedAction - specifies what happens when asking for a connection from a server's pool, and that pool is exhausted. Possible values:
@@ -95,7 +95,7 @@
  * <ul>
  *  <li>timeBetweenEvictionRunsMillis - indicates how long the eviction thread should sleep before "runs" of examining idle
  *  connections. When non-positive, no eviction thread will be launched. The default setting for this parameter is
- *  300000(5 minutes) </li>
+ *  2 minutes </li>
  *   <li> minEvictableIdleTimeMillis - specifies the minimum amount of time that an connection may sit idle in the pool before it
  *   is eligible for eviction due to idle time. When non-positive, no connection will be dropped from the pool due to
  *   idle time alone. This setting has no effect unless timeBetweenEvictionRunsMillis > 0. The default setting for this
@@ -107,7 +107,7 @@
  *   <li>minIdle - sets a target value for the minimum number of idle connections (per server) that should always be available.
  *   If this parameter is set to a positive number and timeBetweenEvictionRunsMillis > 0, each time the idle connection
  *   eviction thread runs, it will try to create enough idle instances so that there will be minIdle idle instances
- *   available for each server.  The default setting for this parameter is 0. </li>
+ *   available for each server.  The default setting for this parameter is 5 minutes. </li>
  * </ul>
  * </li>
  * <li>

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -19,16 +19,16 @@
 
    public PropsKeyedObjectPoolFactory(KeyedPoolableObjectFactory factory, Properties props) {
       super(factory);
-      _maxActive = intProp(props, "maxActive", 2);
+      _maxActive = intProp(props, "maxActive", -1);
       _maxTotal = intProp(props, "maxTotal", -1);
-      _maxIdle = intProp(props, "maxIdle", 2);
+      _maxIdle = intProp(props, "maxIdle", -1);
       _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", (int) GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK);
       _testOnBorrow = booleanProp(props, "testOnBorrow", false);
       _testOnReturn = booleanProp(props, "testOnReturn", false);
-      _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", 5 * 60 * 1000);
-      _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 30 * 60 * 1000);
+      _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", 2 * 60 * 1000);
+      _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 5 * 60 * 1000);
       _testWhileIdle = booleanProp(props, "testWhileIdle", true);
-      _minIdle = intProp(props, "minIdle", 0);
+      _minIdle = intProp(props, "minIdle", 1);
       _lifo = booleanProp(props, "lifo", true);
    }
 

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -12,6 +12,7 @@
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Transport implementation based on TCP.
@@ -22,10 +23,14 @@
 @ThreadSafe
 public class TcpTransport extends AbstractTransport {
 
+   //needed for debugging
+   private static AtomicLong ID_COUNTER = new AtomicLong(0);
+
    private static Log log = LogFactory.getLog(TcpTransport.class);
 
    private final Socket socket;
    private final InetSocketAddress serverAddress;
+   private final long id = ID_COUNTER.incrementAndGet();
 
    public TcpTransport(InetSocketAddress serverAddress, TransportFactory transportFactory) {
       super(transportFactory);
@@ -179,6 +184,7 @@
       return "TcpTransport{" +
             "socket=" + socket +
             ", serverAddress=" + serverAddress +
+            ", id =" + id +
             "} ";
    }
 
@@ -212,4 +218,8 @@
          log.warn("Issues closing transport: " + this, e);
       }
    }
+
+   public boolean isValid() {
+      return !socket.isClosed();
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -98,12 +98,23 @@
    @Override
    public void releaseTransport(Transport transport) {
       TcpTransport tcpTransport = (TcpTransport) transport;
-      try {
-         connectionPool.returnObject(tcpTransport.getServerAddress(), tcpTransport);
-      } catch (Exception e) {
-         log.warn("Could not release connection: " + tcpTransport, e);
-      } finally {
-         logConnectionInfo(tcpTransport.getServerAddress());
+      if (!tcpTransport.isValid()) {
+         try {
+            if (log.isTraceEnabled()) {
+               log.info("Dropping connection as it is no longer valid: " + tcpTransport);
+            }
+            connectionPool.invalidateObject(tcpTransport.getServerAddress(), tcpTransport);
+         } catch (Exception e) {
+            log.warn("Could not invalidate connection: " + tcpTransport, e);
+         }
+      } else {
+         try {
+            connectionPool.returnObject(tcpTransport.getServerAddress(), tcpTransport);
+         } catch (Exception e) {
+            log.warn("Could not release connection: " + tcpTransport, e);
+         } finally {
+            logConnectionInfo(tcpTransport.getServerAddress());
+         }
       }
    }
 
@@ -184,6 +195,9 @@
 
    @Override
    public int getTransportCount() {
+      if (Thread.currentThread().isInterrupted()) { 
+         return -1;
+      }
       if (connectionPool.getMaxActive() > 0) {
          return connectionPool.getMaxActive() * servers.size();
       } else {

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -65,6 +65,9 @@
 
    @Override
    public void destroyObject(Object key, Object obj) throws Exception {
+      if (log.isTraceEnabled()) {
+         log.trace("About to destroy tcp transport: "+ obj);
+      }
       TcpTransport transport = (TcpTransport) obj;
       transport.destroy();
    }

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -0,0 +1,75 @@
+package org.infinispan.client.hotrod;
+
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.HeavyLoadConnectionPoolingTest", groups = "functional")
+public class HeavyLoadConnectionPoolingTest extends SingleCacheManagerTest {
+   private HotRodServer hotRodServer;
+   private RemoteCacheManager remoteCacheManager;
+   private RemoteCache<Object, Object> remoteCache;
+   private GenericKeyedObjectPool connectionPool;
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      cacheManager = TestCacheManagerFactory.createLocalCacheManager();
+      cache = cacheManager.getCache();
+
+      hotRodServer = TestHelper.startHotRodServer(cacheManager);
+
+      Properties hotrodClientConf = new Properties();
+      hotrodClientConf.put("hotrod-servers", "localhost:"+hotRodServer.getPort());
+      hotrodClientConf.put("timeBetweenEvictionRunsMillis", "3000");
+      hotrodClientConf.put("minEvictableIdleTimeMillis", "1000");
+      remoteCacheManager = new RemoteCacheManager(hotrodClientConf);
+      remoteCache = remoteCacheManager.getCache();
+
+      TcpTransportFactory tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+      connectionPool = (GenericKeyedObjectPool) TestingUtil.extractField(tcpConnectionFactory, "connectionPool");
+
+      return cacheManager;
+   }
+
+   public void testHeavyLoad() throws InterruptedException {
+      List<WorkerThread> workers = new ArrayList<WorkerThread>();
+
+      //create 20 threads and do work with them
+      for (int i =0; i < 20; i++) {
+         WorkerThread workerThread = new WorkerThread(remoteCache);
+         workers.add(workerThread);
+         workerThread.stress();
+      }
+      while (!(connectionPool.getNumActive() > 15)) {
+         Thread.sleep(10);
+      }
+
+      for (WorkerThread wt: workers) {
+         wt.interrupt();
+         wt.waitToFinish();
+      }
+      //now wait for the idle thread to wake up and clean them
+      for (int i = 0; i < 50; i++) {
+         System.out.println("connectionPool = " + connectionPool.getNumActive());
+         if (connectionPool.getNumIdle() == 1) break;
+         Thread.sleep(1000);
+      }
+      assertEquals(1, connectionPool.getNumIdle());
+      assertEquals(0, connectionPool.getNumActive());
+   }
+}

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java	2010-07-01 10:02:10 UTC (rev 1945)
@@ -3,6 +3,7 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
+import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -20,15 +21,17 @@
    public static final String NULL = "_null_";
    public static final String PUT_SYNC = "_put_sync_";
    public static final String PUT_ASYNC = "_put_async_";
+   public static final String STRESS = "_stress_";
 
-   final RemoteCache<String, String> remoteCache;
+   final RemoteCache remoteCache;
    final BlockingQueue send = new ArrayBlockingQueue(1);
    final BlockingQueue receive = new ArrayBlockingQueue(1);
 
    volatile String key;
    volatile String value;
+   volatile boolean finished = false;
 
-   public WorkerThread(RemoteCache<String, String> remoteCache) {
+   public WorkerThread(RemoteCache remoteCache) {
       super("WorkerThread-" + WORKER_INDEX.getAndIncrement());
       this.remoteCache = remoteCache;
       start();
@@ -45,19 +48,40 @@
                trace("exiting!");
                return;
             }
-            Object result = remoteCache.put(key, value);
-            trace("Added to the cache (" + key + "," + value + ") and returning " + result);
-            if (PUT_SYNC.equals(o)) {
-               receive.put(result == null ? NULL : result);
-               trace("Que now has: " + receive.peek());
+            if (PUT_SYNC.equals(o) || PUT_ASYNC.equals(o)) {
+               Object result = remoteCache.put(key, value);
+               trace("Added to the cache (" + key + "," + value + ") and returning " + result);
+               if (PUT_SYNC.equals(o)) {
+                  receive.put(result == null ? NULL : result);
+                  trace("Que now has: " + receive.peek());
+               }
             }
+            if (STRESS.equals(o)) {
+               stress_();
+            }
          } catch (InterruptedException e) {
             e.printStackTrace();
             throw new IllegalStateException(e);
+         } finally {
+            finished = true;
          }
       }
    }
 
+   private void stress_() {
+      Random rnd = new Random();
+      while (!isInterrupted()) {
+         remoteCache.put(rnd.nextLong(), rnd.nextLong());
+         System.out.println(getName() + " Finished put.");
+         try {
+            Thread.sleep(50);
+         } catch (InterruptedException e) {
+            interrupted();
+            return;
+         }
+      }
+   }
+
    /**
     * Only returns when this thread added the given key value.
     */
@@ -107,7 +131,26 @@
       }
    }
 
+   public void stress() {
+      try {
+         send.put(STRESS);
+      } catch (InterruptedException e) {
+         e.printStackTrace();
+         throw new IllegalStateException(e);
+      }
+   }
+
    private void trace(String message) {
       log.trace("Worker: " + message);
    }
+
+   public void waitToFinish() {
+      while (!finished) {
+         try {
+            Thread.sleep(200);
+         } catch (InterruptedException e) {
+            Thread.interrupted();
+         }
+      }
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties	2010-07-01 07:31:37 UTC (rev 1944)
+++ branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties	2010-07-01 10:02:10 UTC (rev 1945)
@@ -3,14 +3,14 @@
 
 
 ##bellow is connection pooling config
-maxActive=2
+maxActive= -1
 maxTotal = -1
-maxIdle = 2
+maxIdle = -1
 whenExhaustedAction = 1
-timeBetweenEvictionRunsMillis=300000
-minEvictableIdleTimeMillis=1800000
+timeBetweenEvictionRunsMillis=120000
+minEvictableIdleTimeMillis=300000
 testWhileIdle = true
-minIdle = 0
+minIdle = 1
 
 asyn-executor-factory=org.infinispan.client.hotrod.DefaultAsyncExecutorFactory
 default-executor-factory.poolSize = 1;



More information about the infinispan-commits mailing list