[infinispan-commits] Infinispan SVN: r1634 - in trunk/client/hotrod-client/src/main/java: org and 7 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Mar 29 11:41:39 EDT 2010
Author: mircea.markus
Date: 2010-03-29 11:41:37 -0400 (Mon, 29 Mar 2010)
New Revision: 1634
Added:
trunk/client/hotrod-client/src/main/java/org/
trunk/client/hotrod-client/src/main/java/org/infinispan/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
Log:
ongoing work on hotrod client
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,21 @@
+package org.infinispan.client.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public enum Flag {
+ FORCE_RETURN_VALUE(0x0800);
+
+ private int flagInt;
+
+ Flag(int flagInt) {
+ this.flagInt = flagInt;
+ }
+
+ public int getFlagInt() {
+ return flagInt;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,350 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides remote reference to a Hot Rod server/cluster. It implements {@link org.infinispan.Cache}, but given its
+ * nature (remote) some operations are not supported. All these unsupported operations are being overridden within this
+ * interface and documented as such.
+ * <p>
+ * <b>New operations</b>: besides the operations inherited from {@link org.infinispan.Cache}, RemoteCache also adds new
+ * operations to optimize/reduce network traffic: e.g. versioned put operation.
+ * <p>
+ * <b>Concurrency</b>: implementors of this interface will support multi-threaded access, similar to the way {@link
+ * org.infinispan.Cache} supports it.
+ * <p>
+ * <b>Return values</b>: previously existing values for certain {@link java.util.Map} operations are not returned,
+ * null is returned instead. E.g. {@link java.util.Map#put(Object, Object)} returns the previous value
+ * associated to the supplied key. In case of RemoteCache, this returns null.
+ * <p>
+ * <b>Synthetic operations</b>: aggregate operations are being implemented based on other Hot Rod operations.
+ * E.g. all the {@link java.util.Map#putAll(java.util.Map)} is implemented through multiple individual puts. This means
+ * that the these operations are not atomic and that they are costly, e.g. as the number of network round-trips is not
+ * one, but the size of the added map. All these synthetic operations are documented as such.
+ * <p>
+ * <b>changing default behavior through {@link org.infinispan.client.hotrod.Flag}s</b>: it is possible to change de default cache behaviour by using
+ * flags on an per invocation basis.
+ * E.g.
+ * <pre>
+ * RemoteCache cache = getRemoteCache();
+ * Object value = cache.withFlags(Flag.FORCE_RETURN_VALUE).get(aKey);
+ * </pre>
+ * In the previous example, using {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} will make the client to also return previously
+ * existing value associated with <tt>aKey</tt>. If this flag would not be present, Infinispan would return (by default)
+ * <tt>null</tt>. This is in order to avoid fetching a possibly large object from the remote server, which might not be
+ * needed. The flags as set by the {@link org.infinispan.client.hotrod.RemoteCache#withFlags(Flag...)} operation only apply for the very next
+ * operation executed <b>by the same thread</b> on the RemoteCache.
+ * <p>
+ * <b><a href="http://community.jboss.org/wiki/Eviction">Eviction and expiration</a></b>:
+ * Unlike local {@link org.infinispan.Cache} cache, which allows specifying time values with any granularity (as defined by {@link TimeUnit}),
+ * HotRod only supports seconds as time units. If a different time unit is used instead, HotRod will transparently convert it to
+ * seconds, using {@link java.util.concurrent.TimeUnit#toSeconds(long)} method. This might result in loss of precision for
+ * values specified as nanos or milliseconds. <br/>
+ * Another fundamental difference is in the case of lifespan (naturally does NOT apply for max idle): If number of seconds is bigger than 30 days,
+ * this number of seconds is treated as UNIX time and so, represents the number of seconds since 1/1/1970. <br/>
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface RemoteCache<K, V> extends Cache<K, V> {
+
+
+ /**
+ * Removes the given entry only if its version matches the supplied version. A typical use case looks like this:
+ * <pre>
+ * VersionedEntry ve = remoteCache.getVersioned(key);
+ * //some processing
+ * remoteCache.remove(key, ve.getVersion();
+ * </pre>
+ * Lat call (remove) will make sure that the entry will only be removed if it hasn't been changed in between.
+ *
+ * @return true if the entry has been removed
+ * @see org.infinispan.client.hotrod.RemoteCache.VersionedValue
+ * @see #getVersioned(Object)
+ */
+ boolean remove(K key, long version);
+
+ /**
+ * @see #remove(Object, Object)
+ */
+ NotifyingFuture<Boolean> removeAsync(Object key, long version);
+
+ /**
+ * Removes the given value only if its version matches the supplied version. See {@link #remove(Object, long)} for a
+ * sample usage.
+ *
+ * @return true if the method has been replaced
+ * @see #getVersioned(Object)
+ * @see org.infinispan.client.hotrod.RemoteCache.VersionedValue
+ */
+ boolean replace(K key, V newValue, long version);
+
+ /**
+ * @see #replace(Object, Object, long)
+ */
+ boolean replace(K key, V newValue, long version, int lifespanSeconds);
+
+ /**
+ * @see #replace(Object, Object, long)
+ */
+ boolean replace(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds);
+
+ /**
+ * @see #replace(Object, Object, long)
+ */
+ NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version);
+
+ /**
+ * @see #replace(Object, Object, long)
+ */
+ NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds);
+
+ /**
+ * @see #replace(Object, Object, long)
+ */
+ NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds);
+
+
+ /**
+ * Returns the {@link org.infinispan.client.hotrod.RemoteCache.VersionedValue} associated to the supplied key param, or null if it doesn't exist.
+ */
+ VersionedValue getVersioned(K key);
+
+
+ /**
+ * Operation might be supported for smart clients that will be able to register for topology changes.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ void addListener(Object listener);
+
+ /**
+ * @throws UnsupportedOperationException
+ * @see #addListener(Object)
+ */
+ @Override
+ void removeListener(Object listener);
+
+ /**
+ * @throws UnsupportedOperationException
+ * @see #addListener(Object)
+ */
+ @Override
+ Set<Object> getListeners();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ int size();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean isEmpty();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean containsValue(Object value);
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ Set<K> keySet();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ Collection<V> values();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ Set<Entry<K, V>> entrySet();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ void evict(K key);
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ Configuration getConfiguration();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean startBatch();
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ void endBatch(boolean successful);
+
+ /**
+ * This operation is not supported. Consider using {@link #remove(Object, long)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean remove(Object key, Object value);
+
+ /**
+ * This operation is not supported. Consider using {@link #removeAsync(Object, long)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ NotifyingFuture<Boolean> removeAsync(Object key, Object value);
+
+ /**
+ * This operation is not supported. Consider using {@link #replace(Object, Object, long)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean replace(K key, V oldValue, V newValue);
+
+ /**
+ * This operation is not supported. Consider using {@link #replace(K,V,long,int)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit unit);
+
+ /**
+ * This operation is not supported. Consider using {@link #replace(K,V,long,int,int)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit);
+
+ /**
+ * This operation is not supported. Consider using {@link #replaceAsync(Object, Object, long)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
+
+ /**
+ * This operation is not supported. Consider using {@link #replaceAsync(K,V,long,int)}
+ * instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit);
+
+ /**
+ * This operation is not supported. Consider using {@link #replaceAsync(K,V,long,int,int)} instead.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+
+ /**
+ * This operation is not supported.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ AdvancedCache<K, V> getAdvancedCache();
+
+ /**
+ * This operation is not supported.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ void compact();
+
+ /**
+ * This operation is not supported.
+ *
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ void putForExternalRead(K key, V value);
+
+ /**
+ * Synthetic operation. The client iterates over the set of keys and calls put for each one of them. This results in
+ * operation not being atomic (if a failure happens after few puts it is not rolled back) and costly (for each key in
+ * the parameter map a remote call is performed).
+ */
+ @Override
+ void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit unit);
+
+ /**
+ * Synthetic operation.
+ *
+ * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+ */
+ @Override
+ void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit);
+
+ /**
+ * Synthetic operation.
+ *
+ * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+ */
+ @Override
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data);
+
+ /**
+ * Synthetic operation.
+ *
+ * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+ */
+ @Override
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit);
+
+ /**
+ * Synthetic operation.
+ *
+ * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+ */
+ @Override
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+
+ /**
+ * Synthetic operation.
+ *
+ * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+ */
+ @Override
+ void putAll(Map<? extends K, ? extends V> m);
+
+ /**
+ * Besides the key and value, also contains an version. To be used in versioned operations, e.g. {@link RemoteCache#remove(Object, long)}.
+ */
+ public static interface VersionedValue<V> {
+ public long getVersion();
+ public V getValue();
+ }
+
+ RemoteCache withFlags(Flag... flags);
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,139 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.impl.HotrodOperations;
+import org.infinispan.client.hotrod.impl.HotrodOperationsImpl;
+import org.infinispan.client.hotrod.impl.HotrodMarshaller;
+import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
+import org.infinispan.client.hotrod.impl.SerializationMarshaller;
+import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.lifecycle.Lifecycle;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class RemoteCacheManager implements CacheContainer, Lifecycle {
+
+ private Properties props;
+ private TransportFactory transportFactory;
+ private String hotrodMarshaller;
+
+
+ /**
+ * Build a cache manager based on supplied given properties.
+ * TODO - add a list of all possible configuration parameters here
+ */
+ public RemoteCacheManager(Properties props, boolean start) {
+ this.props = props;
+ if (start) start();
+ }
+
+ /**
+ * Same as {@link #RemoteCacheManager(java.util.Properties, boolean)}, and it also starts the cache (start==true).
+ */
+ public RemoteCacheManager(Properties props) {
+ this.props = props;
+ }
+
+ /**
+ * Same as {@link #RemoteCacheManager(java.util.Properties)}, but it will try to lookup the config properties in the
+ * classpath, in a file named <tt>hotrod-client.properties</tt>.
+ * @param start weather or not to start the RemoteCacheManager
+ * @throws HotRodClientException if such a file cannot be found in the classpath
+ */
+ public RemoteCacheManager(boolean start) {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ InputStream stream = loader.getResourceAsStream("hotrod-client.properties");
+ loadFromStream(stream);
+ if (start) start();
+ }
+
+ /**
+ * Same as {@link #RemoteCacheManager(boolean)} and it also starts the cache.
+ */
+ public RemoteCacheManager() {
+ this(true);
+ }
+
+ /**
+ * Same as {@link #RemoteCacheManager(java.util.Properties)}, but it will try to lookup the config properties in
+ * supplied URL.
+ * @param start weather or not to start the RemoteCacheManager
+ * @throws HotRodClientException if properties could not be loaded
+ */
+ public RemoteCacheManager(URL config, boolean start) {
+ try {
+ loadFromStream(config.openStream());
+ } catch (IOException e) {
+ throw new HotRodClientException("Could not read URL:" + config, e);
+ }
+ if (start)
+ start();
+ }
+
+ /**
+ * Same as {@link #RemoteCacheManager(java.net.URL)} and it also starts the cache (start==true).
+ * @param config
+ */
+ public RemoteCacheManager(URL config) {
+ this(config, true);
+ }
+
+
+ private void loadFromStream(InputStream stream) {
+ props = new Properties();
+ try {
+ props.load(stream);
+ } catch (IOException e) {
+ throw new HotRodClientException("Issues configuring from client hotrod-client.properties",e);
+ }
+ }
+
+ public <K,V> RemoteCache<K,V> getCache(String cacheName) {
+ return createRemoteCache(cacheName);
+ }
+
+ public <K,V> RemoteCache<K,V> getCache() {
+ return createRemoteCache(DefaultCacheManager.DEFAULT_CACHE_NAME);
+ }
+
+ @Override
+ public void start() {
+ String factory = props.getProperty("transport-factory"); //todo switch to Netty transport by default (second param)
+ if (factory == null)
+ throw new IllegalStateException("Missing required property: transport-factory!");
+ transportFactory = (TransportFactory) newInstance(factory);
+ transportFactory.init(props);
+ hotrodMarshaller = props.getProperty("marshaller", SerializationMarshaller.class.getName());
+ }
+
+ @Override
+ public void stop() {
+ transportFactory.destroy();
+ }
+
+ private Object newInstance(String clazz) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ return ((Class<?>) contextClassLoader.loadClass(clazz)).newInstance();
+ } catch (Exception e) {
+ throw new HotRodClientException("Could not instantiate class: " + clazz, e);
+ }
+ }
+
+ private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName) {
+ HotrodMarshaller marshaller = (HotrodMarshaller) newInstance(hotrodMarshaller);
+ HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
+ return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName);
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,16 @@
+package org.infinispan.client.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class Version {
+
+ private static final String PROTOCOL_VERSION = "1.0";
+
+ public static String getProtocolVersion() {
+ return "HotRod client, protocol version :" + PROTOCOL_VERSION;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,13 @@
+package org.infinispan.client.hotrod.exceptions;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class ClientDisconnectedException extends HotRodException {
+ public ClientDisconnectedException() {
+ super("Cannot call a method on the remote cache after RemoteCacheFactory.stop() has been called.");
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,33 @@
+package org.infinispan.client.hotrod.exceptions;
+
+import org.infinispan.client.hotrod.exceptions.HotRodException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class HotRodClientException extends HotRodException {
+ private long messageId;
+ private int errorStatusCode;
+
+ public HotRodClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public HotRodClientException(String remoteMessage, long messageId, int errorStatusCode) {
+ super(remoteMessage);
+ this.messageId = messageId;
+ this.errorStatusCode = errorStatusCode;
+ }
+
+
+ @Override
+ public String toString() {
+ return "HotRodServerException{" +
+ "messageId=" + messageId +
+ ", errorStatusCode=" + errorStatusCode +
+ "} " + super.toString();
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,24 @@
+package org.infinispan.client.hotrod.exceptions;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class HotRodException extends RuntimeException {
+ public HotRodException() {
+ }
+
+ public HotRodException(String message) {
+ super(message);
+ }
+
+ public HotRodException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public HotRodException(Throwable cause) {
+ super(cause);
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,26 @@
+package org.infinispan.client.hotrod.exceptions;
+
+import org.infinispan.client.hotrod.exceptions.HotRodException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class InvalidResponseException extends HotRodException {
+ public InvalidResponseException() {
+ }
+
+ public InvalidResponseException(String message) {
+ super(message);
+ }
+
+ public InvalidResponseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidResponseException(Throwable cause) {
+ super(cause);
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,10 @@
+package org.infinispan.client.hotrod.exceptions;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class TimeoutException extends HotRodException {
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,44 @@
+package org.infinispan.client.hotrod.impl;
+
+import java.util.Arrays;
+
+/**
+* // TODO: Document this
+*
+* @author mmarkus
+* @since 4.1
+*/
+public class BinaryVersionedValue {
+ private final long version;
+ private final byte[] value;
+
+ public BinaryVersionedValue(long version, byte[] value) {
+ this.version = version;
+ this.value = value;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ BinaryVersionedValue that = (BinaryVersionedValue) o;
+
+ return version == that.version && Arrays.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (version ^ (version >>> 32));
+ result = 31 * result + (value != null ? Arrays.hashCode(value) : 0);
+ return result;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,68 @@
+package org.infinispan.client.hotrod.impl;
+
+import java.util.EnumMap;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public interface HotrodConstants {
+
+ public static final short REQUEST_MAGIC = 0xA0;
+ public static final short RESPONSE_MAGIC = 0xA1;
+
+ public static final byte HOTROD_VERSION = 10;
+
+ //requests
+ public static final byte PUT_REQUEST = 0x01;
+ public static final byte GET_REQUEST = 0x03;
+ public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+ public static final byte REPLACE_REQUEST = 0x07;
+ public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+ public static final byte REMOVE_REQUEST = 0x0B;
+ public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+ public static final byte CONTAINS_KEY_REQUEST = 0x0F;
+ public static final byte PUT_FOR_EXTERNAL_READ_REQUEST = 0x11;
+ public static final byte GET_WITH_CAS_REQUEST = 0x13;
+ public static final byte EVICT_REQUEST = 0x15;
+ public static final byte CLEAR_REQUEST = 0x17;
+ public static final byte STATS_REQUEST = 0x19;
+ public static final byte QUIT_REQUEST = 0x1B;
+ public static final byte EVENT_REGISTRATION_REQUEST = 0x1D;
+
+ //responses
+ public static final byte PUT_RESPONSE = 0x02;
+ public static final byte GET_RESPONSE = 0x04;
+ public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+ public static final byte REPLACE_RESPONSE = 0x08;
+ public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+ public static final byte REMOVE_RESPONSE = 0x0C;
+ public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+ public static final byte CONTAINS_KEY_RESPONSE = 0x10;
+ public static final byte PUT_FOR_EXTERNAL_READ_RESPONSE = 0x12;
+ public static final byte GET_WITH_CAS_RESPONSE = 0x14;
+ public static final byte EVICT_RESPONSE = 0x16;
+ public static final byte CLEAR_RESPONSE = 0x18;
+ public static final byte STATS_RESPONSE = 0x1A;
+ public static final byte QUIT_RESPONSE = 0x1C;
+ public static final byte EVENT_REGISTRATION_RESPONSE = 0x1E;
+ public static final byte ERROR_RESPONSE = 0x50;
+
+ //response status
+ public static final byte NO_ERROR_STATUS = 0x00;
+ public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+ public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+ public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+ public static final int UNKNOWN_COMMAND_STATUS = 0x82;
+ public static final int SERVER_ERROR_STATUS = 0x85;
+ public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+ public static final int UNKNOWN_VERSION_STATUS = 0x83;
+ public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+
+
+ public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+ public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+ public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,14 @@
+package org.infinispan.client.hotrod.impl;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface HotrodMarshaller {
+
+ byte[] marshallObject(Object toMarshall);
+
+ Object readObject(byte[] bytes);
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,82 @@
+package org.infinispan.client.hotrod.impl;
+
+
+import org.infinispan.client.hotrod.Flag;
+
+import java.util.Map;
+
+/**
+ * // TODO: Document this
+ *
+ * - TODO - add timeout support
+ * - TODO - enforce encoding and add such tests
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public interface HotrodOperations {
+
+ public enum VersionedOperationResponse {
+ SUCCESS(true), NO_SUCH_KEY(false), MODIFIED_KEY(false);
+ private boolean isModified;
+
+ VersionedOperationResponse(boolean modified) {
+ isModified = modified;
+ }
+
+ public boolean isUpdated() {
+ return isModified;
+ }
+ }
+
+ public byte[] get(byte[] key, Flag... flags);
+
+ public byte[] remove(byte[] key, Flag... flags);
+
+ public boolean containsKey(byte[] key, Flag... flags);
+
+ /**
+ * Returns null if the given key does not exist.
+ */
+ public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags);
+
+ /**
+ * @param lifespan number of seconds that a entry during which the entry is allowed to life.
+ * If number of seconds is bigger than 30 days, this number of seconds is treated as UNIX time and so, represents
+ * the number of seconds since 1/1/1970. If set to 0, lifespan is unlimited.
+ * @param maxIdle Number of seconds that a entry can be idle before it's evicted from the cache. If 0, no max
+ * @param flags
+ */
+ public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+ /**
+ * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param flags
+ */
+ public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+ /**
+ * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param flags
+ */
+ public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+ /**
+ * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param flags
+ */
+ public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags);
+
+ public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags);
+
+ public void clear(Flag... flags);
+
+ public Map<String, String> stats();
+
+ public String stats(String paramName);
+
+ public boolean ping();
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,282 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.TimeoutException;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
+
+ private final byte[] cacheNameBytes;
+ private static final AtomicLong MSG_ID = new AtomicLong();
+ private TransportFactory transportFactory;
+ private byte clientIntelligence;
+
+ public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
+ cacheNameBytes = cacheName.getBytes(); //todo add charset here
+ this.transportFactory = transportFactory;
+ }
+
+ public byte[] get(byte[] key, Flag[] flags) {
+ Transport transport = getTransport();
+ try {
+ short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ }
+ if (status == NO_ERROR_STATUS) {
+ return readValue(transport);
+ }
+ } finally {
+ transport.release();
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public byte[] remove(byte[] key, Flag[] flags) {
+ Transport transport = getTransport();
+ try {
+ short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ } else if (status == NO_ERROR_STATUS) {
+ return returnPossiblePrevValue(transport, flags);
+ }
+ } finally {
+ transport.release();
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public boolean containsKey(byte[] key, Flag... flags) {
+ Transport transport = getTransport();
+ try {
+ short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return false;
+ } else if (status == NO_ERROR_STATUS) {
+ return true;
+ }
+ } finally {
+ transport.release();
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
+ Transport transport = getTransport();
+ try {
+ short status = sendKeyOperation(key, transport, GET_WITH_CAS_REQUEST, flags, GET_WITH_CAS_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ }
+ if (status == NO_ERROR_STATUS) {
+ long version = transport.readVLong();
+ byte[] value = readValue(transport);
+ return new BinaryVersionedValue(version, value);
+ }
+ } finally {
+ transport.release();
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+
+ public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+ Transport transport = getTransport();
+ try {
+ short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
+ if (status != NO_ERROR_STATUS) {
+ throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+ }
+ return returnPossiblePrevValue(transport, flags);
+ } finally {
+ transport.release();
+ }
+ }
+
+ public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+ Transport transport = getTransport();
+ try {
+ short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
+ if (status == NO_ERROR_STATUS) {
+ return returnPossiblePrevValue(transport, flags);
+ } else if (status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ return null;
+ }
+ } finally {
+ transport.release();
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+ return null;
+ }
+
+ public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
+ return null; // TODO: Customise this generated block
+ }
+
+ public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
+ return null; // TODO: Customise this generated block
+ }
+
+ public void clear(Flag... flags) {
+ // TODO: Customise this generated block
+ }
+
+ public Map<String, String> stats() {
+ return null; // TODO: Customise this generated block
+ }
+
+ public String stats(String paramName) {
+ return null; // TODO: Customise this generated block
+ }
+
+ public Transport getTransport() {
+ return transportFactory.getTransport();
+ }
+
+ private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
+ // 1) write [header][key length][key]
+ long messageId = writeHeader(transport, opCode, flags);
+ transport.writeBytesArray(key);
+ transport.flush();
+
+ // 2) now read the header
+ short status = readHeaderAndValidate(transport, messageId, opCode, opRespCode);
+
+ // 3) process possible error messages
+ checkForErrorsInResponseStatus(status, messageId, transport);
+
+ return status;
+ }
+
+ @Override
+ public boolean ping() {
+ return false; // TODO: Customise this generated block
+ }
+
+ //[header][key length][key][lifespan][max idle][value length][value]
+
+ private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
+ // 1) write header
+ long messageId = writeHeader(transport, opCode, flags);
+
+ // 2) write key and value
+ transport.writeBytesArray(key);
+ transport.writeVInt(lifespan);
+ transport.writeVInt(maxIdle);
+ transport.writeBytesArray(value);
+ transport.flush();
+
+ // 3) now read header
+ short status = readHeaderAndValidate(transport, messageId, opCode, opRespCode);
+ checkForErrorsInResponseStatus(status, messageId, transport);
+
+ //return status (not error status for sure)
+ return status;
+ }
+
+ /*
+ * Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
+ */
+
+ private long writeHeader(Transport transport, short operationCode, Flag... flags) {
+ transport.appendUnsignedByte(REQUEST_MAGIC);
+ long messageId = MSG_ID.incrementAndGet();
+ transport.writeVLong(messageId);
+ transport.writeByte(HOTROD_VERSION);
+ transport.writeByte(operationCode);
+ transport.writeBytesArray(cacheNameBytes);
+ int flagInt = 0;
+ if (flags != null) {
+ for (Flag flag : flags) {
+ flagInt = flag.getFlagInt() | flagInt;
+ }
+ }
+ transport.writeVInt(flagInt);
+ transport.writeByte(clientIntelligence);
+ transport.writeVInt(0);//this will be changed once smarter clients are supported
+ return messageId;
+ }
+
+ /**
+ * Magic | Message Id | Op code | Status | Topology Change Marker
+ */
+ private short readHeaderAndValidate(Transport transport, long messageId, short opCode, short opRespCode) {
+ short magic = transport.readByte();
+ if (magic != RESPONSE_MAGIC) {
+ throw new InvalidResponseException("Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic));
+ }
+ long receivedMessageId = transport.readVLong();
+ if (receivedMessageId != messageId) {
+ throw new InvalidResponseException("Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId));
+ }
+ short receivedOpCode = transport.readByte();
+ if (receivedOpCode != opRespCode) {
+ if (receivedOpCode == ERROR_RESPONSE) {
+ checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
+ throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
+ }
+ throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opCode) + " and received " + Integer.toHexString(receivedOpCode));
+ }
+ short status = transport.readByte();
+ transport.readByte(); //todo - this will be changed once we support smarter than basic clients
+ return status;
+ }
+
+ private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+ switch ((int) status) {
+ case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
+ case REQUEST_PARSING_ERROR_STATUS:
+ case UNKNOWN_COMMAND_STATUS:
+ case SERVER_ERROR_STATUS:
+ case UNKNOWN_VERSION_STATUS: {
+ throw new HotRodClientException(transport.readString(), messageId, status);
+ }
+ case COMMAND_TIMEOUT_STATUS: {
+ throw new TimeoutException();
+ }
+ case NO_ERROR_STATUS:
+ case KEY_DOES_NOT_EXIST_STATUS: {
+ //don't do anything, these are correct responses
+ break;
+ }
+ default: {
+ throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
+ }
+ }
+ }
+
+ /**
+ * Reads the length og the byte array and then the byte array from the transport.
+ */
+ private byte[] readValue(Transport transport) {
+ int responseLength = transport.readVInt();
+ byte[] result = new byte[responseLength];
+ return transport.readByteArray(result);
+ }
+
+ private boolean hasForceReturn(Flag[] flags) {
+ if (flags == null) return false;
+ for (Flag flag : flags) {
+ if (flag == Flag.FORCE_RETURN_VALUE) return true;
+ }
+ return false;
+ }
+
+ private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
+ return hasForceReturn(flags) ? readValue(transport) : null;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,198 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.Version;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class RemoteCacheImpl<K, V> extends RemoteCacheSupport<K,V> {
+
+ private ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
+ private HotrodOperations operations;
+ private HotrodMarshaller marshaller;
+ private String name;
+
+
+ public RemoteCacheImpl(HotrodOperations operations, HotrodMarshaller marshaller, String name) {
+ this.operations = operations;
+ this.marshaller = marshaller;
+ this.name = name;
+ }
+
+ @Override
+ public boolean remove(K key, long version) {
+ HotrodOperations.VersionedOperationResponse response = operations.removeIfUnmodified(obj2bytes(key), version, flags());
+ return response.isUpdated();
+ }
+
+ @Override
+ public NotifyingFuture<Boolean> removeAsync(Object key, long version) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public boolean replace(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
+ HotrodOperations.VersionedOperationResponse response = operations.replaceIfUnmodified(obj2bytes(key), obj2bytes(newValue), lifespanSeconds, maxIdleTimeSeconds, version, flags());
+ return response.isUpdated();
+ }
+
+ @Override
+ public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public VersionedValue<V> getVersioned(K key) {
+ BinaryVersionedValue value = operations.getWithVersion(obj2bytes(key), flags());
+ return binary2VersionedValue(value);
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+ for (Entry<? extends K, ? extends V> entry : map.entrySet()) {
+ put(entry.getKey(), entry.getValue(), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
+ }
+ }
+
+ @Override
+ public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getVersion() {
+ return Version.getProtocolVersion();
+ }
+
+ @Override
+ public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+ int lifespanSecs = toSeconds(lifespan, lifespanUnit);
+ int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+ byte[] result = operations.put(obj2bytes(key), obj2bytes(value), lifespanSecs, maxIdleSecs, flags());
+ return (V) bytes2obj(result);
+ }
+
+
+ @Override
+ public V putIfAbsent(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+ int lifespanSecs = toSeconds(lifespan, lifespanUnit);
+ int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+ byte[] bytes = operations.putIfAbsent(obj2bytes(key), obj2bytes(value), lifespanSecs, maxIdleSecs, flags());
+ return (V) bytes2obj(bytes);
+ }
+
+ @Override
+ public V replace(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+ int lifespanSecs = toSeconds(lifespan, lifespanUnit);
+ int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+ byte[] bytes = operations.replace(obj2bytes(key), obj2bytes(value), lifespanSecs, maxIdleSecs, flags());
+ return (V) bytes2obj(bytes);
+ }
+
+ @Override
+ public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public NotifyingFuture<Void> clearAsync() {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public NotifyingFuture<V> removeAsync(Object key) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return null; // TODO: Customise this generated block
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return operations.containsKey(obj2bytes(key), flags());
+ }
+
+ @Override
+ public V get(Object key) {
+ byte[] bytes = operations.get(obj2bytes(key), flags());
+ return (V) bytes2obj(bytes);
+ }
+
+ @Override
+ public V remove(Object key) {
+ byte[] existingValue = operations.remove(obj2bytes(key), flags());
+ return (V) bytes2obj(existingValue);
+ }
+
+ @Override
+ public void clear() {
+ operations.clear(flags());
+ }
+
+ @Override
+ public void start() {
+ // TODO: Customise this generated block
+ }
+
+ @Override
+ public void stop() {
+ // TODO: Customise this generated block
+ }
+
+
+ @Override
+ public RemoteCache withFlags(Flag... flags) {
+ this.flagsMap.set(flags);
+ return this;
+ }
+
+ private Flag[] flags() {
+ Flag[] flags = this.flagsMap.get();
+ this.flagsMap.remove();
+ return flags;
+ }
+
+ private byte[] obj2bytes(Object obj) {
+ return this.marshaller.marshallObject(obj);
+ }
+
+ private Object bytes2obj(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ return this.marshaller.readObject(bytes);
+ }
+
+ private VersionedValue<V> binary2VersionedValue(BinaryVersionedValue value) {
+ if (value == null)
+ return null;
+ V valueObj = (V) bytes2obj(value.getValue());
+ return new VersionedValueImpl<V>(value.getVersion(), valueObj);
+ }
+
+ private int toSeconds(long durration, TimeUnit timeUnit) {
+ //todo make sure this can pe enveloped on an int
+ return (int) timeUnit.toSeconds(durration);
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,154 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.CacheSupport;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class RemoteCacheSupport<K,V> extends CacheSupport<K,V> implements RemoteCache<K,V> {
+
+
+ @Override
+ public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version) {
+ return replaceAsync(key, newValue, version, 0);
+ }
+
+ @Override
+ public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds) {
+ return replaceAsync(key, newValue, version, 0, 0);
+ }
+
+ @Override
+ public boolean replace(K key, V newValue, long version) {
+ return replace(key, newValue, version, 0);
+ }
+
+ @Override
+ public CacheManager getCacheManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean replace(K key, V newValue, long version, int lifespanSeconds) {
+ return replace(key, newValue, version, lifespanSeconds, 0);
+ }
+
+ @Override
+ public void putForExternalRead(K key, V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addListener(Object listener) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeListener(Object listener) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<Object> getListeners() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void evict(K key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean startBatch() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void endBatch(boolean successful) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NotifyingFuture<Boolean> removeAsync(Object key, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AdvancedCache<K, V> getAdvancedCache() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void compact() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ComponentStatus getStatus() {
+ throw new UnsupportedOperationException();
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,40 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class SerializationMarshaller implements HotrodMarshaller {
+
+ @Override
+ public byte[] marshallObject(Object toMarshall) {
+ ByteArrayOutputStream result = new ByteArrayOutputStream(1000);
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(result);
+ oos.writeObject(toMarshall);
+ return result.toByteArray();
+ } catch (IOException e) {
+ throw new HotRodClientException("Unexpected!", e);
+ }
+ }
+
+ @Override
+ public Object readObject(byte[] bytes) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+ return ois.readObject();
+ } catch (Exception e) {
+ throw new HotRodClientException("Unexpected!", e);
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,41 @@
+package org.infinispan.client.hotrod.impl;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public interface Transport {
+
+ public void writeBytesArray(byte... toAppend);
+
+ public void writeByte(short toWrite);
+
+ /**
+ * Treats the tailing byte as an unsigned byte.
+ */
+ public void appendUnsignedByte(short requestMagic);
+
+ public void writeVInt(int length);
+
+ public void writeVLong(long l);
+
+ public long readVLong();
+
+ public int readVInt();
+
+ public void flush();
+
+ public short readByte();
+
+ public void release();
+
+ /**
+ * reads an vint which is size; then an array having that size.
+ * @param bufferToFill
+ */
+ public byte[] readByteArray(byte[] bufferToFill);
+
+ String readString();
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,17 @@
+package org.infinispan.client.hotrod.impl;
+
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface TransportFactory {
+ public Transport getTransport();
+
+ void init(Properties props);
+
+ void destroy();
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,31 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.RemoteCache;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class VersionedValueImpl<V> implements RemoteCache.VersionedValue<V> {
+
+ private long version;
+
+ private V value;
+
+ public VersionedValueImpl(long version, V value) {
+ this.version = version;
+ this.value = value;
+ }
+
+ @Override
+ public long getVersion() {
+ return version;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,26 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import org.infinispan.client.hotrod.exceptions.HotRodException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class TransportException extends HotRodException {
+ public TransportException() {
+ }
+
+ public TransportException(String message) {
+ super(message);
+ }
+
+ public TransportException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TransportException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,102 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class VHelper {
+ private static final int MAX_VINT_BYTES = 5;
+ private static final int MAX_VLONG_BYTES = 9;
+
+ public static int readVInt(InputStream is) {
+ int result = 0;
+ for (int i = 0; i < MAX_VINT_BYTES; i++) {
+ int aByte = nextByte(is);
+ boolean hasMore = (aByte & 128) > 0;
+ aByte &= 127; //remove leading byte
+ result = result | (aByte << (i*7));
+ if (!hasMore) break;
+ }
+ if (result < 0)
+ throw new TransportException("negative number read: " + result);
+ return result;
+ }
+
+ public static void writeVInt(int toWrite, OutputStream os) {
+ boolean hasMore;
+ do {
+ int currentByte = toWrite & 0x0000007F;
+ toWrite = toWrite >> 7;
+ hasMore = toWrite > 0;
+ if (hasMore) {
+ currentByte |= 128;
+ }
+ writeByte(os, currentByte);
+ } while (hasMore);
+ }
+
+ private static void writeByte(OutputStream os, int currentByte) {
+ try {
+ os.write(currentByte);
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ private static int nextByte(InputStream is) {
+ try {
+ int result = is.read();
+ if (result < 0) {
+ throw new TransportException("Unexpected end of stream " + result);
+ }
+ return result;
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public static long readVLong(InputStream is) {
+ long result = 0;
+ for (int i = 0; i < MAX_VLONG_BYTES; i++) {
+ long aByte = nextByte(is);
+ boolean hasMore = (aByte & 128) > 0;
+ aByte &= 127; //remove leading byte
+ result = result | (aByte << (i*7));
+ if (!hasMore) break;
+ }
+ if (result < 0)
+ throw new TransportException("negative number read: " + result);
+ return result;
+ }
+
+
+ public static void writeVLong(long toWrite, OutputStream os) {
+ boolean hasMore;
+ do {
+ long currentByte = toWrite & 0x000000000000007F;
+ toWrite = toWrite >> 7;
+ hasMore = toWrite > 0;
+ if (hasMore) {
+ currentByte |= 128;
+ }
+ writeByte(os, (int)currentByte);
+ } while (hasMore);
+ }
+
+ public static void main(String[] args) {
+ long zero = 0;
+ long aByte = 1;
+ long shift = 35;
+ System.out.println((zero | (aByte << 35)));
+ long a = 1l<<18;
+ long b = 1l<<17;
+ long ab = a * b;
+ System.out.println(ab);
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,147 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.logging.Logger;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class TcpTransport implements Transport {
+
+ public static final Logger log = Logger.getLogger(TcpTransport.class.getName());
+
+ private String host;
+ private int port;
+ private Socket socket;
+
+ public void appendUnsignedByte(short requestMagic) {
+ try {
+ socket.getOutputStream().write(requestMagic);
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public void writeVInt(int length) {
+ try {
+ VHelper.writeVInt(length, socket.getOutputStream());
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public void writeVLong(long l) {
+ try {
+ VHelper.writeVLong(l, socket.getOutputStream());
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public long readVLong() {
+ try {
+ return VHelper.readVLong(socket.getInputStream());
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public int readVInt() {
+ try {
+ return VHelper.readVInt(socket.getInputStream());
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public TcpTransport(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public void connect() {
+ try {
+ socket = new Socket(host, port);
+ } catch (IOException e) {
+ throw new TransportException("Problems establishing initial connection", e);
+ }
+ }
+
+ public void writeBytesArray(byte... toAppend) {
+ try {
+ writeVInt(toAppend.length);
+ socket.getOutputStream().write(toAppend);
+ } catch (IOException e) {
+ throw new TransportException("Problems writing data to stream", e);
+ }
+ }
+
+ @Override
+ public void writeByte(short toWrite) {
+ try {
+ socket.getOutputStream().write(toWrite);
+ } catch (IOException e) {
+ throw new TransportException("Problems writing data to stream", e);
+ }
+ }
+
+ public void flush() {
+ try {
+ socket.getOutputStream().flush();
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ public short readByte() {
+ int resultInt;
+ try {
+ resultInt = socket.getInputStream().read();
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ if (resultInt == -1) {
+ throw new TransportException("End of stream reached!");
+ }
+ return (short) resultInt;
+ }
+
+ public void release() {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.warning("Issues closing socket:" + e.getMessage());
+ }
+ }
+
+ public byte[] readByteArray(byte[] bufferToFill) {
+ int size;
+ try {
+ size = socket.getInputStream().read(bufferToFill);
+ } catch (IOException e) {
+ throw new TransportException(e);
+ }
+ if (size == -1) {
+ throw new RuntimeException("End of stream reached!");
+ }
+ if (size != bufferToFill.length) {
+ throw new TransportException("Expected " + bufferToFill.length + " bytes but only could read " + size + " bytes!");
+ }
+ return bufferToFill;
+ }
+
+ @Override
+ public String readString() {
+ long strLength = readVLong();
+ byte[] strContent = readByteArray(new byte[(int)strLength]);
+ return new String(strContent);//todo take care of encoding here
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,45 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.TransportFactory;
+
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class TcpTransportFactory implements TransportFactory {
+
+ private String serverHost;
+ private int serverPort;
+
+ public void init(Properties props) {
+ String servers = props.getProperty("hotrod-servers");
+ StringTokenizer tokenizer = new StringTokenizer(servers,";");
+ String server = tokenizer.nextToken();
+ String[] serverDef = tokenizeServer(server);
+ serverHost = serverDef[0];
+ serverPort = Integer.parseInt(serverDef[1]);
+ }
+
+ @Override
+ public void destroy() {
+ // TODO: Customise this generated block
+ }
+
+ private String[] tokenizeServer(String server) {
+ StringTokenizer t = new StringTokenizer(server, ":");
+ return new String[] {t.nextToken(), t.nextToken()};
+ }
+
+ @Override
+ public Transport getTransport() {
+ TcpTransport transport = new TcpTransport(serverHost, serverPort);
+ transport.connect();
+ return transport;
+ }
+}
More information about the infinispan-commits
mailing list