[jbosscache-commits] JBoss Cache SVN: r5316 - in core/trunk/src: main/java/org/jboss/cache/config and 6 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Feb 6 09:20:41 EST 2008
Author: manik.surtani at jboss.com
Date: 2008-02-06 09:20:40 -0500 (Wed, 06 Feb 2008)
New Revision: 5316
Removed:
core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java
core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java
core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java
Modified:
core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Log:
Documented stream pool sizes, fixed more tests
Modified: core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2008-02-06 14:20:40 UTC (rev 5316)
@@ -708,6 +708,34 @@
</para>
</entry>
</row>
+
+ <row>
+ <entry>
+ <para>ObjectInputStreamPoolSize and ObjectOutputStreamPoolSize</para>
+ </entry>
+
+ <entry>
+ <para>
+ Since JBoss Cache 2.1.0, object input and output streams - used to serialize and deserialize RPC
+ calls in a cluster - are pooled to reduce the overhead of constructing such streams. They are
+ reused
+ by making use of special resettable stream implementations.
+ </para>
+ <para>
+ by default, these stream pools are set at
+ <literal>50</literal>
+ objects each. You could increase or decrease the pool
+ size if, while profiling, you see a lot of threads blocking on
+ <literal>ObjectStreamPool.getInputStream()</literal>
+ or<literal>ObjectStreamPool.getOutputStream()</literal>. In general, having more streams is
+ better
+ than having fewer than needed. Based on your application, make sure you have more streams
+ available
+ than number of threads you expect to concurrently write to the cache.
+ </para>
+ </entry>
+ </row>
+
</tbody>
</tgroup>
</informaltable>
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -187,6 +187,8 @@
private String marshallerClass;
private ShutdownHookBehavior shutdownHookBehavior = ShutdownHookBehavior.DEFAULT;
private boolean useLazyDeserialization = false;
+ private int objectInputStreamPoolSize = 50;
+ private int objectOutputStreamPoolSize = 50;
// ------------------------------------------------------------------------------------------------------------
// SETTERS - MAKE SURE ALL SETTERS PERFORM testImmutability()!!!
@@ -488,9 +490,34 @@
public void setUseLazyDeserialization(boolean useLazyDeserialization)
{
+ testImmutability("useLazyDeserialization");
this.useLazyDeserialization = useLazyDeserialization;
}
+ /**
+ * Initialises the size of the object input stream pool size, which defaults to 50.
+ *
+ * @param objectInputStreamPoolSize
+ * @since 2.1.0
+ */
+ public void setObjectInputStreamPoolSize(int objectInputStreamPoolSize)
+ {
+ testImmutability("objectInputStreamPoolSize");
+ this.objectInputStreamPoolSize = objectInputStreamPoolSize;
+ }
+
+ /**
+ * Initialises the size of the object output stream pool size, which defaults to 50.
+ *
+ * @param objectOutputStreamPoolSize
+ * @since 2.1.0
+ */
+ public void setObjectOutputStreamPoolSize(int objectOutputStreamPoolSize)
+ {
+ testImmutability("objectOutputStreamPoolSize");
+ this.objectOutputStreamPoolSize = objectOutputStreamPoolSize;
+ }
+
// ------------------------------------------------------------------------------------------------------------
// GETTERS
// ------------------------------------------------------------------------------------------------------------
@@ -680,6 +707,25 @@
}
/**
+ * @return the size of he object input stream pool
+ * @since 2.1.0
+ */
+ public int getObjectInputStreamPoolSize()
+ {
+ return objectInputStreamPoolSize;
+ }
+
+ /**
+ * @return the size of he object output stream pool
+ * @since 2.1.0
+ */
+ public int getObjectOutputStreamPoolSize()
+ {
+ return objectOutputStreamPoolSize;
+ }
+
+
+ /**
* Returns a {@link java.net.URL} to a default JGroups configuration file.
*
* @return a default JGroups config file
@@ -699,92 +745,93 @@
// OVERRIDDEN METHODS
// ------------------------------------------------------------------------------------------------------------
-
+ @Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- final Configuration that = (Configuration) o;
+ Configuration that = (Configuration) o;
+ if (exposeManagementStatistics != that.exposeManagementStatistics) return false;
if (fetchInMemoryState != that.fetchInMemoryState) return false;
if (inactiveOnStartup != that.inactiveOnStartup) return false;
- if (stateRetrievalTimeout != that.stateRetrievalTimeout) return false;
if (lockAcquisitionTimeout != that.lockAcquisitionTimeout) return false;
+ if (lockParentForChildInsertRemove != that.lockParentForChildInsertRemove) return false;
if (nodeLockingOptimistic != that.nodeLockingOptimistic) return false;
+ if (objectInputStreamPoolSize != that.objectInputStreamPoolSize) return false;
+ if (objectOutputStreamPoolSize != that.objectOutputStreamPoolSize) return false;
if (replQueueInterval != that.replQueueInterval) return false;
if (replQueueMaxElements != that.replQueueMaxElements) return false;
if (replicationVersion != that.replicationVersion) return false;
+ if (stateRetrievalTimeout != that.stateRetrievalTimeout) return false;
if (syncCommitPhase != that.syncCommitPhase) return false;
if (syncReplTimeout != that.syncReplTimeout) return false;
if (syncRollbackPhase != that.syncRollbackPhase) return false;
- if (exposeManagementStatistics != that.exposeManagementStatistics) return false;
+ if (useLazyDeserialization != that.useLazyDeserialization) return false;
if (useRegionBasedMarshalling != that.useRegionBasedMarshalling) return false;
if (useReplQueue != that.useReplQueue) return false;
+ if (usingMultiplexer != that.usingMultiplexer) return false;
if (buddyReplicationConfig != null ? !buddyReplicationConfig.equals(that.buddyReplicationConfig) : that.buddyReplicationConfig != null)
- {
return false;
- }
if (cacheLoaderConfig != null ? !cacheLoaderConfig.equals(that.cacheLoaderConfig) : that.cacheLoaderConfig != null)
- {
return false;
- }
if (cacheMode != that.cacheMode) return false;
if (clusterConfig != null ? !clusterConfig.equals(that.clusterConfig) : that.clusterConfig != null) return false;
if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false;
if (evictionConfig != null ? !evictionConfig.equals(that.evictionConfig) : that.evictionConfig != null)
- {
return false;
- }
if (isolationLevel != that.isolationLevel) return false;
+ if (marshaller != null ? !marshaller.equals(that.marshaller) : that.marshaller != null) return false;
+ if (marshallerClass != null ? !marshallerClass.equals(that.marshallerClass) : that.marshallerClass != null)
+ return false;
if (muxStackName != null ? !muxStackName.equals(that.muxStackName) : that.muxStackName != null) return false;
if (nodeLockingScheme != that.nodeLockingScheme) return false;
+ if (runtimeConfig != null ? !runtimeConfig.equals(that.runtimeConfig) : that.runtimeConfig != null) return false;
+ if (shutdownHookBehavior != that.shutdownHookBehavior) return false;
if (transactionManagerLookupClass != null ? !transactionManagerLookupClass.equals(that.transactionManagerLookupClass) : that.transactionManagerLookupClass != null)
- {
return false;
- }
- if (!safeEquals(runtimeConfig, that.runtimeConfig))
- {
- return false;
- }
- if (!safeEquals(marshallerClass, that.marshallerClass)) return false;
-
- if (lockParentForChildInsertRemove != that.lockParentForChildInsertRemove) return false;
-
return true;
}
+ @Override
public int hashCode()
{
- int result = 17;
- result = 29 * result + (clusterName != null ? clusterName.hashCode() : 0);
- result = 29 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0);
- result = 29 * result + (useReplQueue ? 1 : 0);
- result = 29 * result + replQueueMaxElements;
- result = 29 * result + (int) (replQueueInterval ^ (replQueueInterval >>> 32));
- result = 29 * result + (exposeManagementStatistics ? 1 : 0);
- result = 29 * result + (fetchInMemoryState ? 1 : 0);
- result = 29 * result + (int) replicationVersion;
- result = 29 * result + (int) (lockAcquisitionTimeout ^ (lockAcquisitionTimeout >>> 32));
- result = 29 * result + (int) (syncReplTimeout ^ (syncReplTimeout >>> 32));
- result = 29 * result + (cacheMode != null ? cacheMode.hashCode() : 0);
- result = 29 * result + (inactiveOnStartup ? 1 : 0);
- result = 29 * result + (int) (stateRetrievalTimeout ^ (stateRetrievalTimeout >>> 32));
- result = 29 * result + (isolationLevel != null ? isolationLevel.hashCode() : 0);
- result = 29 * result + (evictionConfig != null ? evictionConfig.hashCode() : 0);
- result = 29 * result + (useRegionBasedMarshalling ? 1 : 0);
- result = 29 * result + (transactionManagerLookupClass != null ? transactionManagerLookupClass.hashCode() : 0);
- result = 29 * result + (cacheLoaderConfig != null ? cacheLoaderConfig.hashCode() : 0);
- result = 29 * result + (syncCommitPhase ? 1 : 0);
- result = 29 * result + (syncRollbackPhase ? 1 : 0);
- result = 29 * result + (buddyReplicationConfig != null ? buddyReplicationConfig.hashCode() : 0);
- result = 29 * result + (nodeLockingOptimistic ? 1 : 0);
- result = 29 * result + (nodeLockingScheme != null ? nodeLockingScheme.hashCode() : 0);
- result = 29 * result + (muxStackName != null ? muxStackName.hashCode() : 0);
- result = 29 * result + (runtimeConfig != null ? runtimeConfig.hashCode() : 0);
- result = 29 * result + (marshallerClass != null ? marshallerClass.hashCode() : 0);
- result = 29 * result + (lockParentForChildInsertRemove ? 1 : 0);
+ int result;
+ result = (marshaller != null ? marshaller.hashCode() : 0);
+ result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0);
+ result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0);
+ result = 31 * result + (useReplQueue ? 1 : 0);
+ result = 31 * result + replQueueMaxElements;
+ result = 31 * result + (int) (replQueueInterval ^ (replQueueInterval >>> 32));
+ result = 31 * result + (exposeManagementStatistics ? 1 : 0);
+ result = 31 * result + (fetchInMemoryState ? 1 : 0);
+ result = 31 * result + (int) replicationVersion;
+ result = 31 * result + (int) (lockAcquisitionTimeout ^ (lockAcquisitionTimeout >>> 32));
+ result = 31 * result + (int) (syncReplTimeout ^ (syncReplTimeout >>> 32));
+ result = 31 * result + (cacheMode != null ? cacheMode.hashCode() : 0);
+ result = 31 * result + (inactiveOnStartup ? 1 : 0);
+ result = 31 * result + (int) (stateRetrievalTimeout ^ (stateRetrievalTimeout >>> 32));
+ result = 31 * result + (isolationLevel != null ? isolationLevel.hashCode() : 0);
+ result = 31 * result + (lockParentForChildInsertRemove ? 1 : 0);
+ result = 31 * result + (evictionConfig != null ? evictionConfig.hashCode() : 0);
+ result = 31 * result + (useRegionBasedMarshalling ? 1 : 0);
+ result = 31 * result + (transactionManagerLookupClass != null ? transactionManagerLookupClass.hashCode() : 0);
+ result = 31 * result + (cacheLoaderConfig != null ? cacheLoaderConfig.hashCode() : 0);
+ result = 31 * result + (syncCommitPhase ? 1 : 0);
+ result = 31 * result + (syncRollbackPhase ? 1 : 0);
+ result = 31 * result + (buddyReplicationConfig != null ? buddyReplicationConfig.hashCode() : 0);
+ result = 31 * result + (nodeLockingOptimistic ? 1 : 0);
+ result = 31 * result + (nodeLockingScheme != null ? nodeLockingScheme.hashCode() : 0);
+ result = 31 * result + (muxStackName != null ? muxStackName.hashCode() : 0);
+ result = 31 * result + (usingMultiplexer ? 1 : 0);
+ result = 31 * result + (runtimeConfig != null ? runtimeConfig.hashCode() : 0);
+ result = 31 * result + (marshallerClass != null ? marshallerClass.hashCode() : 0);
+ result = 31 * result + (shutdownHookBehavior != null ? shutdownHookBehavior.hashCode() : 0);
+ result = 31 * result + (useLazyDeserialization ? 1 : 0);
+ result = 31 * result + objectInputStreamPoolSize;
+ result = 31 * result + objectOutputStreamPoolSize;
return result;
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -8,6 +8,7 @@
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.VersionAwareMarshaller;
+import org.jboss.cache.marshall.io.ObjectStreamPool;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.remoting.jgroups.CacheMessageListener;
import org.jboss.cache.statetransfer.StateTransferManager;
@@ -21,7 +22,7 @@
*/
@DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class, RegionManager.class, Notifier.class,
CacheMessageListener.class, CacheLoaderManager.class, RemoteCacheInvocationDelegate.class, Marshaller.class,
- InvocationContextContainer.class})
+ InvocationContextContainer.class, ObjectStreamPool.class})
public class EmptyConstructorFactory extends ComponentFactory
{
@Override
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -389,7 +389,7 @@
// Object marshalledNode = ois.readObject();
// deserialize result
- Map<Object, Object> oldNode = (Map<Object, Object>) unmarshall(is);
+ Map<Object, Object> oldNode = (Map<Object, Object>) unmarshall(is);
return oldNode;
}
catch (Exception e)
@@ -454,7 +454,7 @@
{
// a hack to handles the incomp. of SQL server jdbc driver prior to SQL SERVER 2005
if (driverName != null && (driverName.contains("SQLSERVER")
- || driverName.contains("POSTGRESQL")))
+ || driverName.contains("POSTGRESQL")))
{
ps.setNull(2, Types.LONGVARBINARY);
}
@@ -535,7 +535,8 @@
ps.setString(2, name.toString());
- /*int rows = */ps.executeUpdate();
+ /*int rows = */
+ ps.executeUpdate();
// if (rows != 1)
// {
// throw new IllegalStateException("Expected one updated row but got " + rows);
@@ -704,7 +705,7 @@
protected byte[] marshall(Object obj) throws Exception
{
- return getMarshaller().objectToByteBuffer(obj);
+ return getMarshaller().objectToByteBuffer(obj, true);
}
private static String toUpperCase(String s)
@@ -723,10 +724,11 @@
{
@Override
- public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+ public Set<java.util.Map.Entry<Object, Object>> entrySet()
+ {
throw new UnsupportedOperationException();
- }
-
+ }
+
};
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -6,7 +6,7 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
import org.jboss.cache.lock.StripedLock;
-import org.jboss.cache.marshall.ObjectSerializationFactory;
+import org.jboss.util.stream.MarshalledValueInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -502,7 +502,7 @@
protected Object unmarshall(File from) throws Exception
{
FileInputStream fileIn = new FileInputStream(from);
- ObjectInputStream input = ObjectSerializationFactory.createObjectInputStream(fileIn);
+ ObjectInputStream input = new MarshalledValueInputStream(fileIn);
Object unmarshalledObj = getMarshaller().objectFromObjectStream(input);
input.close();
return unmarshalledObj;
@@ -511,7 +511,7 @@
protected void marshall(Object obj, File to) throws Exception
{
FileOutputStream fileOut = new FileOutputStream(to);
- ObjectOutputStream output = ObjectSerializationFactory.createObjectOutputStream(fileOut);
+ ObjectOutputStream output = new ObjectOutputStream(fileOut);
getMarshaller().objectToObjectStream(obj, output);
output.close();
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -68,33 +68,22 @@
// implement the basic contract set in RPcDispatcher.AbstractMarshaller
public byte[] objectToByteBuffer(Object obj) throws Exception
{
+ return objectToByteBuffer(obj, false);
+ }
+
+ public byte[] objectToByteBuffer(Object obj, boolean b) throws Exception
+ {
throw new RuntimeException("Needs to be overridden!");
-// ByteArrayOutputStream baos = new ByteArrayOutputStream();
-// ObjectOutputStream out = ObjectSerializationFactory.createObjectOutputStream(baos);
-// objectToObjectStream(obj, out);
-// out.close();
-// return baos.toByteArray();
}
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
- //ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
- //return objectFromObjectStream(in);
}
public Object objectFromStream(InputStream in) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
- // by default just create an OIS from this IS and pass in to the relevant method
-// if (in instanceof ObjectInputStream)
-// {
-// return objectFromObjectStream((ObjectInputStream) in);
-// }
-// else
-// {
-// return objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
-// }
}
/**
Deleted: core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,38 +0,0 @@
-package org.jboss.cache.marshall;
-
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-
-/**
- * Standard Java implementation of ObjectStreamFactory
- *
- * @author Clebert Suconic
- * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
- * @since 1.4.1
- */
-class JavaObjectStreamFactory implements ObjectStreamFactory
-{
-
- public ObjectInputStream createObjectInputStream(byte[] bytes) throws IOException
- {
- ByteArrayInputStream in = new ByteArrayInputStream(bytes);
- return new MarshalledValueInputStream(in);
- }
-
- public ObjectInputStream createObjectInputStream(InputStream in) throws IOException
- {
- return new MarshalledValueInputStream(in);
- }
-
- public ObjectOutputStream createObjectOutputStream(OutputStream out) throws IOException
- {
- return new ObjectOutputStream(out);
- }
-
-}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -68,4 +68,14 @@
* @throws Exception
*/
void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception;
+
+ /**
+ * Same as {@link #objectToByteBuffer(Object)} except that you can optionally specify to write object stream headers.
+ * Useful if you intend to deserialize the stream with non-pooled input streams.
+ *
+ * @param o
+ * @param writeStreamHeaders
+ * @return
+ */
+ byte[] objectToByteBuffer(Object o, boolean writeStreamHeaders) throws Exception;
}
Deleted: core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,76 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.cache.marshall;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-
-/**
- * Factory class for creating object output and inut streams, to allow for multiple mechanisms of serialization.
- * Java serialization is the only supported mechanism at this point.
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
- */
-public class ObjectSerializationFactory
-{
- static ObjectStreamFactory factory = new JavaObjectStreamFactory();
-
- /*
- static
- {
- // start with the NEW property
- String propString = System.getProperty("jboss.serialization");
- if (propString == null)
- {
- // and now check legacy
- propString = System.getProperty("serialization.jboss");
- if (propString != null)
- log.info("The system property 'serialization.jboss' is deprecated and may be removed from future releases. Please use 'jboss.serialization' instead.");
- }
- useJBossSerialization = false; // default.
- if (propString != null) useJBossSerialization = Boolean.valueOf(propString);
-
- try
- {
- if (useJBossSerialization)
- {
- factory = (ObjectStreamFactory) Class.forName("org.jboss.cache.marshall.JBossObjectStreamFactory").newInstance();
- }
- }
- catch (Exception e)
- {
- log.error("Unable to load JBossObjectStreamFactory. Perhaps jboss-serialization jar not loaded?", e);
- log.error("Falling back to java serialization.");
- }
- }
- */
-
- public static ObjectOutputStream createObjectOutputStream(OutputStream out) throws IOException
- {
- return factory.createObjectOutputStream(out);
- }
-
- public static ObjectInputStream createObjectInputStream(byte[] bytes) throws IOException
- {
- return factory.createObjectInputStream(bytes);
- }
-
- public static ObjectInputStream createObjectInputStream(InputStream in) throws IOException
- {
- return factory.createObjectInputStream(in);
- }
-
-// public static boolean useJBossSerialization()
-// {
-// return useJBossSerialization;
-// }
-}
Deleted: core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,23 +0,0 @@
-package org.jboss.cache.marshall;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-
-/**
- * ObjectStreamFactory
- *
- * @author Clebert Suconic
- * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
- * @since 1.4.1
- */
-public interface ObjectStreamFactory
-{
- public ObjectOutputStream createObjectOutputStream(OutputStream out) throws IOException;
-
- public ObjectInputStream createObjectInputStream(byte[] bytes) throws IOException;
-
- public ObjectInputStream createObjectInputStream(InputStream in) throws IOException;
-}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -8,7 +8,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
@@ -16,7 +15,10 @@
import org.jboss.cache.marshall.io.ObjectStreamPool;
import org.jboss.cache.marshall.io.ReusableObjectOutputStream;
import org.jboss.cache.util.Util;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -48,9 +50,10 @@
ObjectStreamPool pool;
@Inject
- void injectComponentRegistry(ComponentRegistry componentRegistry)
+ void injectComponents(ComponentRegistry componentRegistry, ObjectStreamPool pool)
{
this.componentRegistry = componentRegistry;
+ this.pool = pool;
}
@Start
@@ -99,15 +102,6 @@
log.debug("Started with version " + replVersionString + " and versionInt " + versionInt);
log.debug("Using default marshaller class " + this.defaultMarshaller.getClass());
}
-
- try
- {
- pool = new ObjectStreamPool(25, 25);
- }
- catch (Exception e)
- {
- throw new CacheException(e);
- }
}
protected int getCustomMarshallerVersionInt()
@@ -158,9 +152,29 @@
}
}
- public byte[] objectToByteBuffer(Object obj) throws Exception
+ private byte[] useNonPooledStream(Object obj) throws Exception
{
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+
+ out.writeShort(versionInt);
+ if (trace) log.trace("Wrote version " + versionInt);
+
+ //now marshall the contents of the object
+ defaultMarshaller.objectToObjectStream(obj, out);
+ out.close();
+
+ // and return bytes.
+ return baos.toByteArray();
+ }
+
+ @Override
+ public byte[] objectToByteBuffer(Object obj, boolean writeHeader) throws Exception
+ {
+ if (writeHeader) return useNonPooledStream(obj);
+
ReusableObjectOutputStream out = pool.getOutputStream();
+
try
{
out.writeShort(versionInt);
@@ -178,6 +192,7 @@
}
}
+ @Override
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
Marshaller marshaller;
@@ -209,10 +224,35 @@
public Object objectFromStream(InputStream is) throws Exception
{
- int avbl = is.available();
- byte[] bytes = new byte[avbl];
- is.read(bytes, 0, avbl);
- return objectFromByteBuffer(bytes);
+ if (is instanceof ByteArrayInputStream)
+ {
+ int avbl = is.available();
+ byte[] bytes = new byte[avbl];
+ is.read(bytes, 0, avbl);
+ return objectFromByteBuffer(bytes);
+ }
+ else
+ {
+ // actually attempt to "stream" this stuff. We need to revert to an old-fashioned Object Input Stream since
+ // we don't have a reusable implementation for non-byte-backed streams as yet.
+ short versionId;
+ Marshaller marshaller;
+ ObjectInputStream in = new MarshalledValueInputStream(is);
+ try
+ {
+ versionId = in.readShort();
+ if (trace) log.trace("Read version " + versionId);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to read version id from first two bytes of stream, barfing.");
+ throw e;
+ }
+
+ marshaller = getMarshaller(versionId);
+
+ return marshaller.objectFromObjectStream(in);
+ }
}
public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,5 +1,11 @@
package org.jboss.cache.marshall.io;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.BlockingQueue;
@@ -15,22 +21,48 @@
{
BlockingQueue<ReusableObjectOutputStream> outputStreams;
BlockingQueue<ReusableObjectInputStream> inputStreams;
+ int numOutputStreams = 1, numInputStreams = 1;
+ Configuration configuration;
+ Log log = LogFactory.getLog(ObjectStreamPool.class);
- /**
- * Constructs the pool with a fixed number of object output and input streams
- *
- * @param numOutputStreams number of object output streams
- * @param numInputStreams number of object input streams
- * @throws InterruptedException
- * @throws IOException
- */
- public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws InterruptedException, IOException
+ public ObjectStreamPool()
{
+ // for construction by the IOC framework
+ }
+
+ @Inject
+ public void injectDependencies(Configuration configuration)
+ {
+ this.configuration = configuration;
+ }
+
+ @Start
+ public void start() throws InterruptedException, IOException
+ {
+ if (configuration != null)
+ {
+ numInputStreams = configuration.getObjectInputStreamPoolSize();
+ numOutputStreams = configuration.getObjectOutputStreamPoolSize();
+ if (log.isDebugEnabled())
+ log.debug("Using stream sizes from configuration as input: " + numInputStreams + " and output: " + numOutputStreams);
+ }
+
outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+ long start = 0;
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating " + numOutputStreams + " ReusableObjectOutputStreams and " + numInputStreams + " ReusableObjectInputStreams for pool");
+ start = System.currentTimeMillis();
+ }
for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new ReusableObjectOutputStream());
for (int i = 0; i < numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
+ if (log.isDebugEnabled())
+ {
+ log.debug("Initialised object stream pool in " + (System.currentTimeMillis() - start) + " millis");
+ }
+
}
/**
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -53,22 +53,28 @@
public void testVersionHeaderDefaultCurrent() throws Exception
{
+ ObjectStreamPool pool = new ObjectStreamPool();
+ pool.start();
+
VersionAwareMarshaller marshaller = createVAM(Version.getVersionString(Version.getVersionShort()));
+ marshaller.injectComponents(null, pool);
+
byte[] bytes = marshaller.objectToByteBuffer("Hello");
- ObjectStreamPool pool = new ObjectStreamPool(1, 1);
-
ObjectInputStream in = pool.getInputStream(bytes);
assertEquals("Version header short should be '21'", 21, in.readShort());
}
public void testVersionHeader200() throws Exception
{
+ ObjectStreamPool pool = new ObjectStreamPool();
+ pool.start();
+
VersionAwareMarshaller marshaller = createVAM("2.0.0.GA");
+ marshaller.injectComponents(null, pool);
+
byte[] bytes = marshaller.objectToByteBuffer("Hello");
- ObjectStreamPool pool = new ObjectStreamPool(1, 1);
-
ObjectInputStream in = pool.getInputStream(bytes);
assertEquals("Version header short should be '20'", 20, in.readShort());
}
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-06 14:20:40 UTC (rev 5316)
@@ -24,7 +24,10 @@
@BeforeTest
public void setUp() throws IOException, InterruptedException
{
- pool = new ObjectStreamPool(1, 1);
+ pool = new ObjectStreamPool();
+ pool.numInputStreams = 1;
+ pool.numOutputStreams = 1;
+ pool.start();
}
@AfterMethod
@@ -240,6 +243,22 @@
assert ois.readShort() == 6;
}
+ public void testLargeObject() throws Exception
+ {
+ oos = pool.getOutputStream();
+ StringBuilder builder = new StringBuilder();
+ int targetSize = 1 << 22; // 4MB !!
+ System.out.println("target size: " + targetSize);
+ for (int i = 0; i < targetSize; i++) builder.append("X");
+ String fourMegString = builder.toString();
+ oos.writeObject(fourMegString);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+
+ assert ois.readObject().equals(fourMegString);
+ }
+
public void testReusableOutputStreamArraySize() throws InterruptedException, IOException
{
oos = pool.getOutputStream();
More information about the jbosscache-commits
mailing list