[jbosscache-commits] JBoss Cache SVN: r7516 - in core/branches/flat/src/main/java/org/horizon: factories and 6 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Mon Jan 19 14:48:14 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-01-19 14:48:14 -0500 (Mon, 19 Jan 2009)
New Revision: 7516
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java
Modified:
core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java
core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java
core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java
core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java
core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java
core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
Log:
Abstracted more transport-level stuff
Modified: core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/config/RuntimeConfig.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -23,9 +23,6 @@
import org.horizon.remoting.RPCManager;
import org.horizon.util.Util;
-import org.jgroups.Channel;
-import org.jgroups.ChannelFactory;
-import org.jgroups.util.ThreadFactory;
import javax.transaction.TransactionManager;
import java.util.concurrent.ExecutorService;
@@ -37,10 +34,7 @@
private static final long serialVersionUID = 5626847485703341794L;
private transient TransactionManager transactionManager;
- private transient Channel channel;
- private transient ChannelFactory muxChannelFactory;
private RPCManager rpcManager;
- private transient ThreadFactory evictionTimerThreadFactory;
private transient ExecutorService asyncSerializationExecutor;
private transient ExecutorService asyncCacheListenerExecutor;
@@ -48,65 +42,10 @@
* Resets the runtime to default values.
*/
public void reset() {
- channel = null;
rpcManager = null;
}
- /**
- * Gets the factory the cache will use to create a multiplexed channel.
- *
- * @return the channel, or <code>null</code> if not set
- */
- public ChannelFactory getMuxChannelFactory() {
- return muxChannelFactory;
- }
-
- /**
- * Sets the factory the cache should use to create a multiplexed channel. Ignored if a Channel is directly configured
- * via {@link #setChannel(Channel)}. If the channel factory is set, {@link Configuration#setMultiplexerStack(String)}
- * must also be set, or a <code>CacheException</code> will be thrown during cache startup.
- *
- * @param multiplexerChannelFactory channel factory
- */
- public void setMuxChannelFactory(ChannelFactory multiplexerChannelFactory) {
- testImmutability("muxChannelFactory");
- this.muxChannelFactory = multiplexerChannelFactory;
- }
-
- /**
- * Gets the channel the cache is using. <p/> External callers should use extreme care if they access the channel. The
- * cache expects it has exclusive access to the channel; external code trying to send or receive messages via the
- * channel will almost certainly disrupt the operation of the cache. </p>
- *
- * @return the channel. May return <code>null</code> if the channel was not externally set via {@link
- * #setChannel(Channel)} and the cache has not yet been started.
- * @see #setChannel(Channel)
- */
- public Channel getChannel() {
- return channel;
- }
-
- /**
- * Sets the channel the cache will use. The channel should not be connected or closed. <p/> External callers should
- * use extreme care if they access the channel. The cache expects it has exclusive access to the channel; external
- * code trying to send or receive messages via the channel will almost certainly disrupt the operation of the cache.
- * </p> <p/> If an application wishes to send and receive messages using the same underlying channel as the
- * <ocde>Cache</code>, a multiplexed channel should be used. Two separate mux channels should be created from the
- * same <code>ChannelFactory</code> using the same <i>stack name</i> but different <code>id</code>s. See {@link
- * ChannelFactory#createMultiplexerChannel(String,String,boolean,String)}. These two mux channels will share the same
- * underlying channel. One of the two mux channels can be injected into the cache; the other can be used by the
- * application. The cache will not see the application messages and vice versa. </p> <p/> Configuring the cache to
- * use a mux channel can also be done by configuring {@link #setMuxChannelFactory(ChannelFactory) the channel
- * factory} and the {@link Configuration#setMultiplexerStack(String) stack name}, in which case the cache will create
- * and use a mux channel. </p>
- *
- * @param channel channel to set
- */
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
public TransactionManager getTransactionManager() {
return transactionManager;
}
@@ -117,30 +56,6 @@
}
/**
- * This is only relevant if an eviction timer thread factory has been set using {@link
- * #setEvictionTimerThreadFactory(org.jgroups.util.ThreadFactory)}. Will return a null if the eviction timer thread
- * factory needs to be created internally.
- * <p/>
- *
- * @return the thread factory used by the eviction timer's scheduled executor.
- * @since 1.0
- */
- public ThreadFactory getEvictionTimerThreadFactory() {
- return evictionTimerThreadFactory;
- }
-
- /**
- * Sets the eviction timer thread factory to use when creating a scheduled executor. If this is not set, the
- * eviction timer task will use a default thread factory.
- *
- * @param evictionTimerThreadFactory factory to use
- * @since 1.0
- */
- public void setEvictionTimerThreadFactory(ThreadFactory evictionTimerThreadFactory) {
- this.evictionTimerThreadFactory = evictionTimerThreadFactory;
- }
-
- /**
* This is only relevant if the async cache replication executor has been set using {@link
* #setAsyncSerializationExecutor(java.util.concurrent.ExecutorService)}. If the executor is created internally, this
* method will return null.
@@ -199,10 +114,7 @@
if (obj instanceof RuntimeConfig) {
RuntimeConfig other = (RuntimeConfig) obj;
return Util.safeEquals(transactionManager, other.transactionManager)
- && Util.safeEquals(muxChannelFactory, other.muxChannelFactory)
&& Util.safeEquals(rpcManager, other.rpcManager)
- && Util.safeEquals(channel, other.channel)
- && Util.safeEquals(evictionTimerThreadFactory, other.evictionTimerThreadFactory)
&& Util.safeEquals(asyncCacheListenerExecutor, other.asyncCacheListenerExecutor)
&& Util.safeEquals(asyncSerializationExecutor, other.asyncSerializationExecutor);
}
@@ -214,10 +126,7 @@
public int hashCode() {
int result = 17;
result = result * 29 + (transactionManager == null ? 0 : transactionManager.hashCode());
- result = result * 29 + (muxChannelFactory == null ? 0 : muxChannelFactory.hashCode());
result = result * 29 + (rpcManager == null ? 0 : rpcManager.hashCode());
- result = result * 29 + (channel == null ? 0 : channel.hashCode());
- result = result * 29 + (evictionTimerThreadFactory == null ? 0 : evictionTimerThreadFactory.hashCode());
result = result * 29 + (asyncCacheListenerExecutor == null ? 0 : asyncCacheListenerExecutor.hashCode());
result = result * 29 + (asyncSerializationExecutor == null ? 0 : asyncSerializationExecutor.hashCode());
return result;
Modified: core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/factories/EvictionManagerFactory.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -17,8 +17,10 @@
EvictionManagerImpl evManager = new EvictionManagerImpl();
if (configuration.getEvictionConfig() != null) {
long wakeupInterval = configuration.getEvictionConfig().getWakeupInterval();
- evManager.configureEvictionThread(wakeupInterval, configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
- return (T) evManager;
+ // TODO: Fix me
+ throw new RuntimeException("Fix me");
+// evManager.configureEvictionThread(wakeupInterval, configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
+// return (T) evManager;
} else return null;
}
}
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -124,11 +124,11 @@
}
/**
- * Calls prepare(GlobalTransaction,List,org.jgroups.Address,boolean)) in all members except self. Waits for all
- * responses. If one of the members failed to prepare, its return value will be an exception. If there is one
- * exception we rethrow it. This will mark the current transaction as rolled back, which will cause the
- * afterCompletion(int) callback to have a status of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll
- * back the transaction.<br/> If everything runs okay, the afterCompletion(int) callback will trigger the @link
+ * Calls prepare(GlobalTransaction,List,Address,boolean)) in all members except self. Waits for all responses. If one
+ * of the members failed to prepare, its return value will be an exception. If there is one exception we rethrow it.
+ * This will mark the current transaction as rolled back, which will cause the afterCompletion(int) callback to have
+ * a status of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the transaction.<br/> If
+ * everything runs okay, the afterCompletion(int) callback will trigger the @link
* #runCommitPhase(GlobalTransaction)). <br/>
*
* @throws Exception
Modified: core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/io/ByteBuffer.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -21,22 +21,54 @@
*/
package org.horizon.io;
-import org.jgroups.util.Buffer;
-
import java.io.ByteArrayInputStream;
import java.io.InputStream;
/**
- * A subclass of a JGroups Buffer
+ * A byte buffer that exposes the internal byte array with minimal copying
*
* @author (various)
* @since 1.0
*/
-public class ByteBuffer extends Buffer {
- public ByteBuffer(byte[] bytes, int offset, int length) {
- super(bytes, offset, length);
+public class ByteBuffer {
+ private final byte[] buf;
+ private final int offset;
+ private final int length;
+
+ public ByteBuffer(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.offset = offset;
+ this.length = length;
}
+ public byte[] getBuf() {
+ return buf;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public ByteBuffer copy() {
+ byte[] new_buf = buf != null ? new byte[length] : null;
+ int new_length = new_buf != null ? new_buf.length : 0;
+ if (new_buf != null)
+ System.arraycopy(buf, offset, new_buf, 0, length);
+ return new ByteBuffer(new_buf, 0, new_length);
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(length).append(" bytes");
+ if (offset > 0)
+ sb.append(" (offset=").append(offset).append(")");
+ return sb.toString();
+ }
+
/**
* @return an input stream for the bytes in the buffer
*/
Modified: core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -30,9 +30,9 @@
import org.horizon.lock.StripedLock;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
+import org.horizon.remoting.ResponseFilter;
+import org.horizon.remoting.transport.Address;
import org.horizon.tree.Fqn;
-import org.jgroups.Address;
-import org.jgroups.blocks.RspFilter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -345,7 +345,7 @@
// intentional no-op
}
- public static class ResponseValidityFilter implements RspFilter {
+ public static class ResponseValidityFilter implements ResponseFilter {
private int numValidResponses = 0;
private List<Address> pendingResponders;
Modified: core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/marshall/CacheMarshallerStarobrno.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -37,7 +37,6 @@
import org.horizon.util.Immutables;
import org.jboss.util.NotImplementedException;
import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jgroups.util.Buffer;
import java.io.ByteArrayInputStream;
import java.io.Externalizable;
@@ -117,7 +116,7 @@
// implement the basic contract set in RPCDispatcher.AbstractMarshaller
public byte[] objectToByteBuffer(Object obj) throws Exception {
- Buffer b = objectToBuffer(obj);
+ ByteBuffer b = objectToBuffer(obj);
byte[] bytes = new byte[b.getLength()];
System.arraycopy(b.getBuf(), b.getOffset(), bytes, 0, b.getLength());
return bytes;
@@ -144,8 +143,6 @@
} else {
throw new IllegalArgumentException("MethodCall does not have a valid method id. Was this method call created with MethodCallFactory?");
}
- } else if (o instanceof org.jgroups.blocks.MethodCall) {
- throw new IllegalArgumentException("Usage of a legacy MethodCall object!!");
} else if (o instanceof MarshalledValue) {
out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
((MarshalledValue) o).writeExternal(out);
Modified: core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/marshall/MarshalledValueHelper.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -22,8 +22,8 @@
package org.horizon.marshall;
import org.horizon.commands.ReplicableCommand;
+import org.horizon.remoting.transport.Address;
import org.horizon.transaction.GlobalTransaction;
-import org.jgroups.Address;
/**
* Common functionality used by the {@link org.horizon.interceptors.MarshalledValueInterceptor} and the {@link
@@ -47,7 +47,7 @@
type.equals(Void.class) || type.equals(Boolean.class) || type.equals(Character.class) ||
type.equals(Byte.class) || type.equals(Short.class) || type.equals(Integer.class) ||
type.equals(Long.class) || type.equals(Float.class) || type.equals(Double.class) ||
- (type.isArray() && isTypeExcluded(type.getComponentType())) || type.equals(GlobalTransaction.class) || type.equals(Address.class) ||
+ (type.isArray() && isTypeExcluded(type.getComponentType())) || type.equals(GlobalTransaction.class) || Address.class.isAssignableFrom(type) ||
ReplicableCommand.class.isAssignableFrom(type) || type.equals(MarshalledValue.class);
}
}
Modified: core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -22,7 +22,6 @@
package org.horizon.marshall;
import org.horizon.io.ByteBuffer;
-import org.jgroups.blocks.RpcDispatcher;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -43,9 +42,8 @@
* information on which class loader to use to deserialize the object stream, or versioning information to allow streams
* to interoperate between different versions of JBoss Cache (see {@link VersionAwareMarshaller}
* <p/>
- * This interface implements the JGroups building-block interface {@link org.jgroups.blocks.RpcDispatcher.Marshaller}
- * which is used to marshall {@link org.horizon.commands.ReplicableCommand}s, their parameters and their response
- * values.
+ * This interface is used to marshall {@link org.horizon.commands.ReplicableCommand}s, their parameters and their
+ * response values.
* <p/>
* The interface is also used by the {@link org.horizon.loader.CacheLoader} framework to efficiently serialize data to
* be persisted, as well as the {@link org.horizon.statetransfer.StateTransferManager} when serializing the cache for
@@ -54,7 +52,7 @@
* @author <a href="mailto://manik@jboss.org">Manik Surtani</a>
* @since 1.0
*/
-public interface Marshaller extends RpcDispatcher.Marshaller2 {
+public interface Marshaller {
/**
* Marshalls an object to a given {@link java.io.ObjectOutputStream}
*
@@ -79,12 +77,18 @@
Object objectFromStream(InputStream is) throws Exception;
/**
- * A specialized form of {@link org.jgroups.blocks.RpcDispatcher.Marshaller2#objectToBuffer(Object)} that returns an
- * instance of {@link org.horizon.io.ByteBuffer} instead of {@link org.jgroups.util.Buffer}.
+ * A method that returns an instance of {@link org.horizon.io.ByteBuffer}, which allows direct access to the byte
+ * array with minimal array copying
*
* @param o object to marshall
* @return a ByteBuffer
* @throws Exception
*/
ByteBuffer objectToBuffer(Object o) throws Exception;
+
+ Object objectFromByteBuffer(byte[] buf, int offset, int length) throws Exception;
+
+ byte[] objectToByteBuffer(Object obj) throws Exception;
+
+ Object objectFromByteBuffer(byte[] buf) throws Exception;
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ResponseFilter.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -3,7 +3,8 @@
import org.horizon.remoting.transport.Address;
/**
- * // TODO: Manik: Document this!
+ * A mechanism of filtering RPC responses. Used with {@link RPCManager#invokeRemotely(java.util.List,
+ * org.horizon.commands.ReplicableCommand, ResponseMode, long, boolean, ResponseFilter)}
*
* @author Manik Surtani
* @since 1.0
@@ -14,8 +15,8 @@
*
* @param response The response (usually a serializable value)
* @param sender The sender of response
- * @return True if we should add the response to the response list ({@link org.jgroups.util.RspList}) of a request,
- * otherwise false. In the latter case, we don't add the response to the response list.
+ * @return True if we should add the response to the response list of a request, otherwise false. In the latter case,
+ * we don't add the response to the response list.
*/
boolean isAcceptable(Object response, Address sender);
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-01-19 19:24:50 UTC (rev 7515)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -124,8 +124,9 @@
channel.setOpt(Channel.BLOCK, true);
dispatcher = new CommandAwareRpcDispatcher(channel, this,
asyncExecutor, inboundInvocationHandler);
- dispatcher.setRequestMarshaller(marshaller);
- dispatcher.setResponseMarshaller(marshaller);
+ MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
+ dispatcher.setRequestMarshaller(adapter);
+ dispatcher.setResponseMarshaller(adapter);
}
private void buildChannel() {
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/MarshallerAdapter.java 2009-01-19 19:48:14 UTC (rev 7516)
@@ -0,0 +1,40 @@
+package org.horizon.remoting.transport.jgroups;
+
+import org.horizon.io.ByteBuffer;
+import org.horizon.marshall.Marshaller;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+
+/**
+ * Bridge between JGroups and Horizon marshallers
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class MarshallerAdapter implements RpcDispatcher.Marshaller2 {
+ Marshaller m;
+
+ public MarshallerAdapter(Marshaller m) {
+ this.m = m;
+ }
+
+ public Buffer objectToBuffer(Object obj) throws Exception {
+ return toBuffer(m.objectToBuffer(obj));
+ }
+
+ public Object objectFromByteBuffer(byte[] buf, int offset, int length) throws Exception {
+ return m.objectFromByteBuffer(buf, offset, length);
+ }
+
+ public byte[] objectToByteBuffer(Object obj) throws Exception {
+ return m.objectToByteBuffer(obj);
+ }
+
+ public Object objectFromByteBuffer(byte[] buf) throws Exception {
+ return m.objectFromByteBuffer(buf);
+ }
+
+ private Buffer toBuffer(ByteBuffer bb) {
+ return new Buffer(bb.getBuf(), bb.getOffset(), bb.getLength());
+ }
+}
More information about the jbosscache-commits
mailing list