[jbosscache-commits] JBoss Cache SVN: r7516 - in	core/branches/flat/src/main/java/org/horizon: factories and 6	other directories.
    jbosscache-commits at lists.jboss.org 
    jbosscache-commits at lists.jboss.org
       
    Mon Jan 19 14:48:14 EST 2009
    
    
  
Author: manik.surtani at jboss.com
Date: 2009-01-19 14:48:14 -0500 (Mon, 19 Jan 2009)
New Revision: 7516
Added:
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java
Modified:
   core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java
   core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java
   core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java
   core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java
   core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java
   core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java
   core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java
   core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
Log:
Abstracted more transport-level stuff
Modified: core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -23,9 +23,6 @@
 
 import org.horizon.remoting.RPCManager;
 import org.horizon.util.Util;
-import org.jgroups.Channel;
-import org.jgroups.ChannelFactory;
-import org.jgroups.util.ThreadFactory;
 
 import javax.transaction.TransactionManager;
 import java.util.concurrent.ExecutorService;
@@ -37,10 +34,7 @@
    private static final long serialVersionUID = 5626847485703341794L;
 
    private transient TransactionManager transactionManager;
-   private transient Channel channel;
-   private transient ChannelFactory muxChannelFactory;
    private RPCManager rpcManager;
-   private transient ThreadFactory evictionTimerThreadFactory;
    private transient ExecutorService asyncSerializationExecutor;
    private transient ExecutorService asyncCacheListenerExecutor;
 
@@ -48,65 +42,10 @@
     * Resets the runtime to default values.
     */
    public void reset() {
-      channel = null;
       rpcManager = null;
    }
 
 
-   /**
-    * Gets the factory the cache will use to create a multiplexed channel.
-    *
-    * @return the channel, or <code>null</code> if not set
-    */
-   public ChannelFactory getMuxChannelFactory() {
-      return muxChannelFactory;
-   }
-
-   /**
-    * Sets the factory the cache should use to create a multiplexed channel. Ignored if a Channel is directly configured
-    * via {@link #setChannel(Channel)}. If the channel factory is set, {@link Configuration#setMultiplexerStack(String)}
-    * must also be set, or a <code>CacheException</code> will be thrown during cache startup.
-    *
-    * @param multiplexerChannelFactory channel factory
-    */
-   public void setMuxChannelFactory(ChannelFactory multiplexerChannelFactory) {
-      testImmutability("muxChannelFactory");
-      this.muxChannelFactory = multiplexerChannelFactory;
-   }
-
-   /**
-    * Gets the channel the cache is using. <p/> External callers should use extreme care if they access the channel. The
-    * cache expects it has exclusive access to the channel; external code trying to send or receive messages via the
-    * channel will almost certainly disrupt the operation of the cache. </p>
-    *
-    * @return the channel. May return <code>null</code> if the channel was not externally set via {@link
-    *         #setChannel(Channel)} and the cache has not yet been started.
-    * @see #setChannel(Channel)
-    */
-   public Channel getChannel() {
-      return channel;
-   }
-
-   /**
-    * Sets the channel the cache will use.  The channel should not be connected or closed. <p/> External callers should
-    * use extreme care if they access the channel. The cache expects it has exclusive access to the channel; external
-    * code trying to send or receive messages via the channel will almost certainly disrupt the operation of the cache.
-    * </p> <p/> If an application wishes to send and receive messages using the same underlying channel as the
-    * <ocde>Cache</code>, a multiplexed channel should be used. Two separate mux channels should be created from the
-    * same <code>ChannelFactory</code> using the same <i>stack name</i> but different <code>id</code>s. See {@link
-    * ChannelFactory#createMultiplexerChannel(String,String,boolean,String)}. These two mux channels will share the same
-    * underlying channel. One of the two mux channels can be injected into the cache; the other can be used by the
-    * application.  The cache will not see the application messages and vice versa. </p> <p/> Configuring the cache to
-    * use a mux channel can also be done by configuring {@link #setMuxChannelFactory(ChannelFactory) the channel
-    * factory} and the {@link Configuration#setMultiplexerStack(String) stack name}, in which case the cache will create
-    * and use a mux channel. </p>
-    *
-    * @param channel channel to set
-    */
-   public void setChannel(Channel channel) {
-      this.channel = channel;
-   }
-
    public TransactionManager getTransactionManager() {
       return transactionManager;
    }
@@ -117,30 +56,6 @@
    }
 
    /**
-    * This is only relevant if an eviction timer thread factory has been set using {@link
-    * #setEvictionTimerThreadFactory(org.jgroups.util.ThreadFactory)}. Will return a null if the eviction timer thread
-    * factory needs to be created internally.
-    * <p/>
-    *
-    * @return the thread factory used by the eviction timer's scheduled executor.
-    * @since 1.0
-    */
-   public ThreadFactory getEvictionTimerThreadFactory() {
-      return evictionTimerThreadFactory;
-   }
-
-   /**
-    * Sets the eviction timer thread factory to use when creating a scheduled executor.  If this is not set, the
-    * eviction timer task will use a default thread factory.
-    *
-    * @param evictionTimerThreadFactory factory to use
-    * @since 1.0
-    */
-   public void setEvictionTimerThreadFactory(ThreadFactory evictionTimerThreadFactory) {
-      this.evictionTimerThreadFactory = evictionTimerThreadFactory;
-   }
-
-   /**
     * This is only relevant if the async cache replication executor has been set using {@link
     * #setAsyncSerializationExecutor(java.util.concurrent.ExecutorService)}. If the executor is created internally, this
     * method will return null.
@@ -199,10 +114,7 @@
       if (obj instanceof RuntimeConfig) {
          RuntimeConfig other = (RuntimeConfig) obj;
          return Util.safeEquals(transactionManager, other.transactionManager)
-               && Util.safeEquals(muxChannelFactory, other.muxChannelFactory)
                && Util.safeEquals(rpcManager, other.rpcManager)
-               && Util.safeEquals(channel, other.channel)
-               && Util.safeEquals(evictionTimerThreadFactory, other.evictionTimerThreadFactory)
                && Util.safeEquals(asyncCacheListenerExecutor, other.asyncCacheListenerExecutor)
                && Util.safeEquals(asyncSerializationExecutor, other.asyncSerializationExecutor);
       }
@@ -214,10 +126,7 @@
    public int hashCode() {
       int result = 17;
       result = result * 29 + (transactionManager == null ? 0 : transactionManager.hashCode());
-      result = result * 29 + (muxChannelFactory == null ? 0 : muxChannelFactory.hashCode());
       result = result * 29 + (rpcManager == null ? 0 : rpcManager.hashCode());
-      result = result * 29 + (channel == null ? 0 : channel.hashCode());
-      result = result * 29 + (evictionTimerThreadFactory == null ? 0 : evictionTimerThreadFactory.hashCode());
       result = result * 29 + (asyncCacheListenerExecutor == null ? 0 : asyncCacheListenerExecutor.hashCode());
       result = result * 29 + (asyncSerializationExecutor == null ? 0 : asyncSerializationExecutor.hashCode());
       return result;
Modified: core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -17,8 +17,10 @@
       EvictionManagerImpl evManager = new EvictionManagerImpl();
       if (configuration.getEvictionConfig() != null) {
          long wakeupInterval = configuration.getEvictionConfig().getWakeupInterval();
-         evManager.configureEvictionThread(wakeupInterval, configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
-         return (T) evManager;
+         // TODO: Fix me
+         throw new RuntimeException("Fix me");
+//         evManager.configureEvictionThread(wakeupInterval, configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
+//         return (T) evManager;
       } else return null;
    }
 }
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -124,11 +124,11 @@
    }
 
    /**
-    * Calls prepare(GlobalTransaction,List,org.jgroups.Address,boolean)) in all members except self. Waits for all
-    * responses. If one of the members failed to prepare, its return value will be an exception. If there is one
-    * exception we rethrow it. This will mark the current transaction as rolled back, which will cause the
-    * afterCompletion(int) callback to have a status of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll
-    * back the transaction.<br/> If everything runs okay, the afterCompletion(int) callback will trigger the @link
+    * Calls prepare(GlobalTransaction,List,Address,boolean)) in all members except self. Waits for all responses. If one
+    * of the members failed to prepare, its return value will be an exception. If there is one exception we rethrow it.
+    * This will mark the current transaction as rolled back, which will cause the afterCompletion(int) callback to have
+    * a status of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the transaction.<br/> If
+    * everything runs okay, the afterCompletion(int) callback will trigger the @link
     * #runCommitPhase(GlobalTransaction)). <br/>
     *
     * @throws Exception
Modified: core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -21,22 +21,54 @@
  */
 package org.horizon.io;
 
-import org.jgroups.util.Buffer;
-
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 
 /**
- * A subclass of a JGroups Buffer
+ * A byte buffer that exposes the internal byte array with minimal copying
  *
  * @author (various)
  * @since 1.0
  */
-public class ByteBuffer extends Buffer {
-   public ByteBuffer(byte[] bytes, int offset, int length) {
-      super(bytes, offset, length);
+public class ByteBuffer {
+   private final byte[] buf;
+   private final int offset;
+   private final int length;
+
+   public ByteBuffer(byte[] buf, int offset, int length) {
+      this.buf = buf;
+      this.offset = offset;
+      this.length = length;
    }
 
+   public byte[] getBuf() {
+      return buf;
+   }
+
+   public int getOffset() {
+      return offset;
+   }
+
+   public int getLength() {
+      return length;
+   }
+
+   public ByteBuffer copy() {
+      byte[] new_buf = buf != null ? new byte[length] : null;
+      int new_length = new_buf != null ? new_buf.length : 0;
+      if (new_buf != null)
+         System.arraycopy(buf, offset, new_buf, 0, length);
+      return new ByteBuffer(new_buf, 0, new_length);
+   }
+
+   public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(length).append(" bytes");
+      if (offset > 0)
+         sb.append(" (offset=").append(offset).append(")");
+      return sb.toString();
+   }
+
    /**
     * @return an input stream for the bytes in the buffer
     */
Modified: core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -30,9 +30,9 @@
 import org.horizon.lock.StripedLock;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
+import org.horizon.remoting.ResponseFilter;
+import org.horizon.remoting.transport.Address;
 import org.horizon.tree.Fqn;
-import org.jgroups.Address;
-import org.jgroups.blocks.RspFilter;
 
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -345,7 +345,7 @@
       // intentional no-op      
    }
 
-   public static class ResponseValidityFilter implements RspFilter {
+   public static class ResponseValidityFilter implements ResponseFilter {
       private int numValidResponses = 0;
       private List<Address> pendingResponders;
 
Modified: core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -37,7 +37,6 @@
 import org.horizon.util.Immutables;
 import org.jboss.util.NotImplementedException;
 import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jgroups.util.Buffer;
 
 import java.io.ByteArrayInputStream;
 import java.io.Externalizable;
@@ -117,7 +116,7 @@
 
    // implement the basic contract set in RPCDispatcher.AbstractMarshaller
    public byte[] objectToByteBuffer(Object obj) throws Exception {
-      Buffer b = objectToBuffer(obj);
+      ByteBuffer b = objectToBuffer(obj);
       byte[] bytes = new byte[b.getLength()];
       System.arraycopy(b.getBuf(), b.getOffset(), bytes, 0, b.getLength());
       return bytes;
@@ -144,8 +143,6 @@
             } else {
                throw new IllegalArgumentException("MethodCall does not have a valid method id.  Was this method call created with MethodCallFactory?");
             }
-         } else if (o instanceof org.jgroups.blocks.MethodCall) {
-            throw new IllegalArgumentException("Usage of a legacy MethodCall object!!");
          } else if (o instanceof MarshalledValue) {
             out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
             ((MarshalledValue) o).writeExternal(out);
Modified: core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -22,8 +22,8 @@
 package org.horizon.marshall;
 
 import org.horizon.commands.ReplicableCommand;
+import org.horizon.remoting.transport.Address;
 import org.horizon.transaction.GlobalTransaction;
-import org.jgroups.Address;
 
 /**
  * Common functionality used by the {@link org.horizon.interceptors.MarshalledValueInterceptor} and the {@link
@@ -47,7 +47,7 @@
             type.equals(Void.class) || type.equals(Boolean.class) || type.equals(Character.class) ||
             type.equals(Byte.class) || type.equals(Short.class) || type.equals(Integer.class) ||
             type.equals(Long.class) || type.equals(Float.class) || type.equals(Double.class) ||
-            (type.isArray() && isTypeExcluded(type.getComponentType())) || type.equals(GlobalTransaction.class) || type.equals(Address.class) ||
+            (type.isArray() && isTypeExcluded(type.getComponentType())) || type.equals(GlobalTransaction.class) || Address.class.isAssignableFrom(type) ||
             ReplicableCommand.class.isAssignableFrom(type) || type.equals(MarshalledValue.class);
    }
 }
Modified: core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -22,7 +22,6 @@
 package org.horizon.marshall;
 
 import org.horizon.io.ByteBuffer;
-import org.jgroups.blocks.RpcDispatcher;
 
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -43,9 +42,8 @@
  * information on which class loader to use to deserialize the object stream, or versioning information to allow streams
  * to interoperate between different versions of JBoss Cache (see {@link VersionAwareMarshaller}
  * <p/>
- * This interface implements the JGroups building-block interface {@link org.jgroups.blocks.RpcDispatcher.Marshaller}
- * which is used to marshall {@link org.horizon.commands.ReplicableCommand}s, their parameters and their response
- * values.
+ * This interface is used to marshall {@link org.horizon.commands.ReplicableCommand}s, their parameters and their
+ * response values.
  * <p/>
  * The interface is also used by the {@link org.horizon.loader.CacheLoader} framework to efficiently serialize data to
  * be persisted, as well as the {@link org.horizon.statetransfer.StateTransferManager} when serializing the cache for
@@ -54,7 +52,7 @@
  * @author <a href="mailto://manik@jboss.org">Manik Surtani</a>
  * @since 1.0
  */
-public interface Marshaller extends RpcDispatcher.Marshaller2 {
+public interface Marshaller {
    /**
     * Marshalls an object to a given {@link java.io.ObjectOutputStream}
     *
@@ -79,12 +77,18 @@
    Object objectFromStream(InputStream is) throws Exception;
 
    /**
-    * A specialized form of {@link org.jgroups.blocks.RpcDispatcher.Marshaller2#objectToBuffer(Object)} that returns an
-    * instance of {@link org.horizon.io.ByteBuffer} instead of {@link org.jgroups.util.Buffer}.
+    * A method that returns an instance of {@link org.horizon.io.ByteBuffer}, which allows direct access to the byte
+    * array with minimal array copying
     *
     * @param o object to marshall
     * @return a ByteBuffer
     * @throws Exception
     */
    ByteBuffer objectToBuffer(Object o) throws Exception;
+
+   Object objectFromByteBuffer(byte[] buf, int offset, int length) throws Exception;
+
+   byte[] objectToByteBuffer(Object obj) throws Exception;
+
+   Object objectFromByteBuffer(byte[] buf) throws Exception;
 }
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -3,7 +3,8 @@
 import org.horizon.remoting.transport.Address;
 
 /**
- * // TODO: Manik: Document this!
+ * A mechanism of filtering RPC responses.  Used with {@link RPCManager#invokeRemotely(java.util.List,
+ * org.horizon.commands.ReplicableCommand, ResponseMode, long, boolean, ResponseFilter)}
  *
  * @author Manik Surtani
  * @since 1.0
@@ -14,8 +15,8 @@
     *
     * @param response The response (usually a serializable value)
     * @param sender   The sender of response
-    * @return True if we should add the response to the response list ({@link org.jgroups.util.RspList}) of a request,
-    *         otherwise false. In the latter case, we don't add the response to the response list.
+    * @return True if we should add the response to the response list of a request, otherwise false. In the latter case,
+    *         we don't add the response to the response list.
     */
    boolean isAcceptable(Object response, Address sender);
 
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -124,8 +124,9 @@
       channel.setOpt(Channel.BLOCK, true);
       dispatcher = new CommandAwareRpcDispatcher(channel, this,
                                                  asyncExecutor, inboundInvocationHandler);
-      dispatcher.setRequestMarshaller(marshaller);
-      dispatcher.setResponseMarshaller(marshaller);
+      MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
+      dispatcher.setRequestMarshaller(adapter);
+      dispatcher.setResponseMarshaller(adapter);
    }
 
    private void buildChannel() {
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java	2009-01-19 19:48:14 UTC (rev 7516)
@@ -0,0 +1,40 @@
+package org.horizon.remoting.transport.jgroups;
+
+import org.horizon.io.ByteBuffer;
+import org.horizon.marshall.Marshaller;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+
+/**
+ * Bridge between JGroups and Horizon marshallers
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class MarshallerAdapter implements RpcDispatcher.Marshaller2 {
+   Marshaller m;
+
+   public MarshallerAdapter(Marshaller m) {
+      this.m = m;
+   }
+
+   public Buffer objectToBuffer(Object obj) throws Exception {
+      return toBuffer(m.objectToBuffer(obj));
+   }
+
+   public Object objectFromByteBuffer(byte[] buf, int offset, int length) throws Exception {
+      return m.objectFromByteBuffer(buf, offset, length);
+   }
+
+   public byte[] objectToByteBuffer(Object obj) throws Exception {
+      return m.objectToByteBuffer(obj);
+   }
+
+   public Object objectFromByteBuffer(byte[] buf) throws Exception {
+      return m.objectFromByteBuffer(buf);
+   }
+
+   private Buffer toBuffer(ByteBuffer bb) {
+      return new Buffer(bb.getBuf(), bb.getOffset(), bb.getLength());
+   }
+}
    
    
More information about the jbosscache-commits
mailing list