[infinispan-commits] Infinispan SVN: r2013 - in branches/4.1.x/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/protocol and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Jul 12 04:09:33 EDT 2010


Author: mircea.markus
Date: 2010-07-12 04:09:33 -0400 (Mon, 12 Jul 2010)
New Revision: 2013

Added:
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStartStopTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/PingOnStartupTest.java
Modified:
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
   branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties
Log:
ISPN-505-Hot Rod client to send a ping as part of connection to get topology


Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -3,6 +3,7 @@
 import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.infinispan.client.hotrod.impl.async.DefaultAsyncExecutorFactory;
 import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
+import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
 import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsImpl;
 import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
 import org.infinispan.client.hotrod.impl.SerializationMarshaller;
@@ -61,7 +62,10 @@
  * </li>
  *  <li>
  * tcp-no-delay - TCP no delay flag switch. Defaults to true.
- * </li>
+    * </li>
+ *  <li>
+ * ping-on-startup - if true, a ping request is sent to a back end server in order to fetch cluster's topology. True by default.
+    </li>
  * <br/>
  * <i>below is connection pooling config</i>:
  * <p/>
@@ -132,7 +136,7 @@
 
    public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
 
-   public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
+   public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod.client.servers";
 
 
    private Properties props;
@@ -148,7 +152,7 @@
    /**
     * Builds a remote cache manager that relies on the provided {@link org.infinispan.client.hotrod.HotRodMarshaller} for marshalling
     * keys and values to be send over to the remote infinispan cluster.
-    * @param hotRodMarshaller marshaller implementatin to be used
+    * @param hotRodMarshaller marshaller implementation to be used. This will overwrite the marshaller from the properties (if any).
     * @param props other properties
     * @param start weather or not to start the manager on return from the constructor.
     */
@@ -325,8 +329,20 @@
          }
       }
       started = true;
+      ping();
    }
 
+   private void ping() {
+      String pingOnStartup = props.getProperty("ping-on-startup");
+      if (pingOnStartup != null && !Boolean.valueOf(pingOnStartup)) {
+         if (log.isTraceEnabled()) {
+            log.trace("Not pinging on startup as: 'ping-on-startup' = " + pingOnStartup);
+         }
+      } else {
+         transportFactory.ping();
+      }
+   }
+
    @Override
    public void stop() {
       transportFactory.destroy();

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -24,6 +24,8 @@
    static final AtomicLong MSG_ID = new AtomicLong();
    final static byte CLIENT_INTELLIGENCE = HotRodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
 
+   public static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
+
    public static long writeHeader(Transport transport, short operationCode, byte[] cacheName, AtomicInteger topologyId, Flag... flags) {
       transport.writeByte(HotRodConstants.REQUEST_MAGIC);
       long messageId = MSG_ID.incrementAndGet();
@@ -158,4 +160,24 @@
          }
       }
    }
+
+   public static boolean ping(Transport transport, AtomicInteger topologyId) {
+      try {
+         long messageId = HotRodOperationsHelper.writeHeader(transport, HotRodConstants.PING_REQUEST, DEFAULT_CACHE_NAME_BYTES, topologyId);
+         short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotRodConstants.PING_RESPONSE, topologyId);
+         if (respStatus == HotRodConstants.NO_ERROR_STATUS) {
+            if (log.isTraceEnabled())
+               log.trace("Successfully validated transport: " + transport);
+            return true;
+         } else {
+            if (log.isTraceEnabled())
+               log.trace("Unknown response status: " + respStatus);
+            return false;
+         }
+      } catch (Exception e) {
+         if (log.isTraceEnabled())
+            log.trace("Failed to validate transport: " + transport, e);
+         return false;
+      }
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -34,4 +34,6 @@
    boolean isTcpNoDelay();
 
    int getTransportCount();
+
+   void ping();
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -71,4 +71,9 @@
    public int getTransportCount() {
       return 1;
    }
+
+   @Override
+   public void ping() {
+      //ignoring
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -5,6 +5,8 @@
 import org.infinispan.client.hotrod.exceptions.TransportException;
 import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
 import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHashFactory;
+import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
+import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
 import org.infinispan.client.hotrod.impl.transport.Transport;
 import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
@@ -38,9 +40,11 @@
    private volatile ConsistentHash consistentHash;
    private volatile boolean tcpNoDelay;
    private final ConsistentHashFactory hashFactory = new ConsistentHashFactory();
+   private volatile AtomicInteger topologyId;
 
    @Override
    public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
+      this.topologyId = topologyId;
       hashFactory.init(props);
       servers = staticConfiguredServers;
       String balancerClass = props.getProperty("request-balancing-strategy", RoundRobinBalancingStrategy.class.getName());
@@ -205,6 +209,16 @@
       }
    }
 
+   @Override
+   public void ping() {
+      Transport transport = getTransport();
+      try {
+         HotRodOperationsHelper.ping(transport, topologyId);
+      } finally {
+         releaseTransport(transport);
+      }
+   }
+
    public RequestBalancingStrategy getBalancer() {
       return balancer;
    }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -1,9 +1,7 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
 import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
 import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
-import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -19,7 +17,6 @@
    private static final Log log = LogFactory.getLog(TransportObjectFactory.class);
    private final TcpTransportFactory tcpTransportFactory;
    private final AtomicInteger topologyId;
-   private static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
 
    public TransportObjectFactory(TcpTransportFactory tcpTransportFactory, AtomicInteger topologyId) {
       this.tcpTransportFactory = tcpTransportFactory;
@@ -42,26 +39,10 @@
    @Override
    public boolean validateObject(Object key, Object obj) {
       TcpTransport transport = (TcpTransport) obj;
-      try {
-         if (log.isTraceEnabled()) {
-            log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
-         }
-         long messageId = HotRodOperationsHelper.writeHeader(transport, HotRodConstants.PING_REQUEST, DEFAULT_CACHE_NAME_BYTES, topologyId);
-         short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotRodConstants.PING_RESPONSE, topologyId);
-         if (respStatus == HotRodConstants.NO_ERROR_STATUS) {
-            if (log.isTraceEnabled())
-               log.trace("Successfully validated transport: " + transport);
-            return true;
-         } else {
-            if (log.isTraceEnabled())
-               log.trace("Unknown response status: " + respStatus);
-            return false;
-         }
-      } catch (Exception e) {
-         if (log.isTraceEnabled())
-            log.trace("Failed to validate transport: " + transport, e);
-         return false;
+      if (log.isTraceEnabled()) {
+         log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
       }
+      return HotRodOperationsHelper.ping(transport, topologyId);
    }
 
    @Override

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -84,6 +84,7 @@
       //Important: this only connects to one of the two servers!
       Properties props = new Properties();
       props.put("hotrod-servers", "localhost:" + hotRodServer2.getPort() + ";localhost:" + hotRodServer2.getPort());
+      props.put("ping-on-startup", "false");
       remoteCacheManager = new RemoteCacheManager(props);
       remoteCache = remoteCacheManager.getCache();
 

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStartStopTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStartStopTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStartStopTest.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -0,0 +1,53 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(testName = "client.hotrod.HotRodServerStartStopTest", groups = "functional")
+public class HotRodServerStartStopTest extends MultipleCacheManagersTest {
+   private HotRodServer hotRodServer1;
+   private HotRodServer hotRodServer2;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+      addClusterEnabledCacheManager(config);
+      addClusterEnabledCacheManager(config);
+
+      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+
+      assert manager(0).getCache() != null;
+      assert manager(1).getCache() != null;
+
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+      cache(0).put("k","v");
+      assertEquals("v", cache(1).get("k"));
+   }
+
+   public void testTouchServer() {
+      RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotRodServer1.getPort(), true);
+      RemoteCache<Object, Object> remoteCache = remoteCacheManager.getCache();
+      remoteCache.put("k", "v");
+      assertEquals("v", remoteCache.get("k"));
+   }
+
+   @Test (dependsOnMethods = "testTouchServer")
+   public void testHrServerStop() {
+      hotRodServer1.stop();
+      hotRodServer2.stop();
+   }
+}

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/PingOnStartupTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/PingOnStartupTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/PingOnStartupTest.java	2010-07-12 08:09:33 UTC (rev 2013)
@@ -0,0 +1,84 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.util.Properties;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "client.hotrod.PingOnStartupTest")
+public class PingOnStartupTest extends MultipleCacheManagersTest {
+   private HotRodServer hotRodServer1;
+   private HotRodServer hotRodServer2;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+      addClusterEnabledCacheManager(config);
+      addClusterEnabledCacheManager(config);
+
+      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+
+      assert manager(0).getCache() != null;
+      assert manager(1).getCache() != null;
+
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+      cache(0).put("k","v");
+      assertEquals("v", cache(1).get("k"));
+   }
+
+   @AfterClass
+   @Override
+   protected void destroy() {
+      super.destroy();
+      try {
+         hotRodServer1.stop();
+         hotRodServer2.stop();
+      } catch (Exception e) {
+         //ignore
+      }
+   }
+
+   public void testTopologyFetched() {
+      Properties props = new Properties();
+      props.put("hotrod-servers", "localhost:" + hotRodServer2.getPort() + ";localhost:" + hotRodServer2.getPort());
+      props.put("ping-on-startup", "true");
+      RemoteCacheManager remoteCacheManager = new RemoteCacheManager(props);
+
+      TcpTransportFactory tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+      try {
+         assertEquals(2, tcpConnectionFactory.getServers().size());
+      } finally {
+         remoteCacheManager.stop();
+      }
+   }
+
+   public void testTopologyNotFetched() {
+      Properties props = new Properties();
+      props.put("hotrod-servers", "localhost:" + hotRodServer2.getPort() + ";localhost:" + hotRodServer2.getPort());
+      props.put("ping-on-startup", "false");
+      RemoteCacheManager remoteCacheManager = new RemoteCacheManager(props);
+
+      TcpTransportFactory tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+      try {
+         assertEquals(1, tcpConnectionFactory.getServers().size());
+      } finally {
+         remoteCacheManager.stop();
+      }
+   }
+}

Modified: branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties	2010-07-10 07:59:28 UTC (rev 2012)
+++ branches/4.1.x/client/hotrod-client/src/test/resources/hotrod-client.properties	2010-07-12 08:09:33 UTC (rev 2013)
@@ -1,7 +1,7 @@
 hotrod-servers=127.0.0.1:11311
 force-return-value=false
+ping-on-startup=true 
 
-
 ##bellow is connection pooling config
 maxActive= -1
 maxTotal = -1



More information about the infinispan-commits mailing list