[jbosscache-commits] JBoss Cache SVN: r5316 - in core/trunk/src: main/java/org/jboss/cache/config and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Feb 6 09:20:41 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-02-06 09:20:40 -0500 (Wed, 06 Feb 2008)
New Revision: 5316

Removed:
   core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java
   core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java
   core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java
Modified:
   core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
   core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
   core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
   core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
   core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Log:
Documented stream pool sizes, fixed more tests

Modified: core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml	2008-02-06 14:20:40 UTC (rev 5316)
@@ -708,6 +708,34 @@
                      </para>
                   </entry>
                </row>
+
+               <row>
+                  <entry>
+                     <para>ObjectInputStreamPoolSize and ObjectOutputStreamPoolSize</para>
+                  </entry>
+
+                  <entry>
+                     <para>
+                        Since JBoss Cache 2.1.0, object input and output streams - used to serialize and deserialize RPC
+                        calls in a cluster - are pooled to reduce the overhead of constructing such streams. They are
+                        reused
+                        by making use of special resettable stream implementations.
+                     </para>
+                     <para>
+                        by default, these stream pools are set at
+                        <literal>50</literal>
+                        objects each. You could increase or decrease the pool
+                        size if, while profiling, you see a lot of threads blocking on
+                        <literal>ObjectStreamPool.getInputStream()</literal>
+                        or<literal>ObjectStreamPool.getOutputStream()</literal>. In general, having more streams is
+                        better
+                        than having fewer than needed. Based on your application, make sure you have more streams
+                        available
+                        than number of threads you expect to concurrently write to the cache.
+                     </para>
+                  </entry>
+               </row>
+
             </tbody>
          </tgroup>
       </informaltable>

Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -187,6 +187,8 @@
    private String marshallerClass;
    private ShutdownHookBehavior shutdownHookBehavior = ShutdownHookBehavior.DEFAULT;
    private boolean useLazyDeserialization = false;
+   private int objectInputStreamPoolSize = 50;
+   private int objectOutputStreamPoolSize = 50;
 
    // ------------------------------------------------------------------------------------------------------------
    //   SETTERS - MAKE SURE ALL SETTERS PERFORM testImmutability()!!!
@@ -488,9 +490,34 @@
 
    public void setUseLazyDeserialization(boolean useLazyDeserialization)
    {
+      testImmutability("useLazyDeserialization");
       this.useLazyDeserialization = useLazyDeserialization;
    }
 
+   /**
+    * Initialises the size of the object input stream pool size, which defaults to 50.
+    *
+    * @param objectInputStreamPoolSize
+    * @since 2.1.0
+    */
+   public void setObjectInputStreamPoolSize(int objectInputStreamPoolSize)
+   {
+      testImmutability("objectInputStreamPoolSize");
+      this.objectInputStreamPoolSize = objectInputStreamPoolSize;
+   }
+
+   /**
+    * Initialises the size of the object output stream pool size, which defaults to 50.
+    *
+    * @param objectOutputStreamPoolSize
+    * @since 2.1.0
+    */
+   public void setObjectOutputStreamPoolSize(int objectOutputStreamPoolSize)
+   {
+      testImmutability("objectOutputStreamPoolSize");
+      this.objectOutputStreamPoolSize = objectOutputStreamPoolSize;
+   }
+
    // ------------------------------------------------------------------------------------------------------------
    //   GETTERS
    // ------------------------------------------------------------------------------------------------------------
@@ -680,6 +707,25 @@
    }
 
    /**
+    * @return the size of he object input stream pool
+    * @since 2.1.0
+    */
+   public int getObjectInputStreamPoolSize()
+   {
+      return objectInputStreamPoolSize;
+   }
+
+   /**
+    * @return the size of he object output stream pool
+    * @since 2.1.0
+    */
+   public int getObjectOutputStreamPoolSize()
+   {
+      return objectOutputStreamPoolSize;
+   }
+
+
+   /**
     * Returns a {@link java.net.URL} to a default JGroups configuration file.
     *
     * @return a default JGroups config file
@@ -699,92 +745,93 @@
    //   OVERRIDDEN METHODS
    // ------------------------------------------------------------------------------------------------------------
 
-
+   @Override
    public boolean equals(Object o)
    {
       if (this == o) return true;
       if (o == null || getClass() != o.getClass()) return false;
 
-      final Configuration that = (Configuration) o;
+      Configuration that = (Configuration) o;
 
+      if (exposeManagementStatistics != that.exposeManagementStatistics) return false;
       if (fetchInMemoryState != that.fetchInMemoryState) return false;
       if (inactiveOnStartup != that.inactiveOnStartup) return false;
-      if (stateRetrievalTimeout != that.stateRetrievalTimeout) return false;
       if (lockAcquisitionTimeout != that.lockAcquisitionTimeout) return false;
+      if (lockParentForChildInsertRemove != that.lockParentForChildInsertRemove) return false;
       if (nodeLockingOptimistic != that.nodeLockingOptimistic) return false;
+      if (objectInputStreamPoolSize != that.objectInputStreamPoolSize) return false;
+      if (objectOutputStreamPoolSize != that.objectOutputStreamPoolSize) return false;
       if (replQueueInterval != that.replQueueInterval) return false;
       if (replQueueMaxElements != that.replQueueMaxElements) return false;
       if (replicationVersion != that.replicationVersion) return false;
+      if (stateRetrievalTimeout != that.stateRetrievalTimeout) return false;
       if (syncCommitPhase != that.syncCommitPhase) return false;
       if (syncReplTimeout != that.syncReplTimeout) return false;
       if (syncRollbackPhase != that.syncRollbackPhase) return false;
-      if (exposeManagementStatistics != that.exposeManagementStatistics) return false;
+      if (useLazyDeserialization != that.useLazyDeserialization) return false;
       if (useRegionBasedMarshalling != that.useRegionBasedMarshalling) return false;
       if (useReplQueue != that.useReplQueue) return false;
+      if (usingMultiplexer != that.usingMultiplexer) return false;
       if (buddyReplicationConfig != null ? !buddyReplicationConfig.equals(that.buddyReplicationConfig) : that.buddyReplicationConfig != null)
-      {
          return false;
-      }
       if (cacheLoaderConfig != null ? !cacheLoaderConfig.equals(that.cacheLoaderConfig) : that.cacheLoaderConfig != null)
-      {
          return false;
-      }
       if (cacheMode != that.cacheMode) return false;
       if (clusterConfig != null ? !clusterConfig.equals(that.clusterConfig) : that.clusterConfig != null) return false;
       if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false;
       if (evictionConfig != null ? !evictionConfig.equals(that.evictionConfig) : that.evictionConfig != null)
-      {
          return false;
-      }
       if (isolationLevel != that.isolationLevel) return false;
+      if (marshaller != null ? !marshaller.equals(that.marshaller) : that.marshaller != null) return false;
+      if (marshallerClass != null ? !marshallerClass.equals(that.marshallerClass) : that.marshallerClass != null)
+         return false;
       if (muxStackName != null ? !muxStackName.equals(that.muxStackName) : that.muxStackName != null) return false;
       if (nodeLockingScheme != that.nodeLockingScheme) return false;
+      if (runtimeConfig != null ? !runtimeConfig.equals(that.runtimeConfig) : that.runtimeConfig != null) return false;
+      if (shutdownHookBehavior != that.shutdownHookBehavior) return false;
       if (transactionManagerLookupClass != null ? !transactionManagerLookupClass.equals(that.transactionManagerLookupClass) : that.transactionManagerLookupClass != null)
-      {
          return false;
-      }
-      if (!safeEquals(runtimeConfig, that.runtimeConfig))
-      {
-         return false;
-      }
 
-      if (!safeEquals(marshallerClass, that.marshallerClass)) return false;
-
-      if (lockParentForChildInsertRemove != that.lockParentForChildInsertRemove) return false;
-
       return true;
    }
 
+   @Override
    public int hashCode()
    {
-      int result = 17;
-      result = 29 * result + (clusterName != null ? clusterName.hashCode() : 0);
-      result = 29 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0);
-      result = 29 * result + (useReplQueue ? 1 : 0);
-      result = 29 * result + replQueueMaxElements;
-      result = 29 * result + (int) (replQueueInterval ^ (replQueueInterval >>> 32));
-      result = 29 * result + (exposeManagementStatistics ? 1 : 0);
-      result = 29 * result + (fetchInMemoryState ? 1 : 0);
-      result = 29 * result + (int) replicationVersion;
-      result = 29 * result + (int) (lockAcquisitionTimeout ^ (lockAcquisitionTimeout >>> 32));
-      result = 29 * result + (int) (syncReplTimeout ^ (syncReplTimeout >>> 32));
-      result = 29 * result + (cacheMode != null ? cacheMode.hashCode() : 0);
-      result = 29 * result + (inactiveOnStartup ? 1 : 0);
-      result = 29 * result + (int) (stateRetrievalTimeout ^ (stateRetrievalTimeout >>> 32));
-      result = 29 * result + (isolationLevel != null ? isolationLevel.hashCode() : 0);
-      result = 29 * result + (evictionConfig != null ? evictionConfig.hashCode() : 0);
-      result = 29 * result + (useRegionBasedMarshalling ? 1 : 0);
-      result = 29 * result + (transactionManagerLookupClass != null ? transactionManagerLookupClass.hashCode() : 0);
-      result = 29 * result + (cacheLoaderConfig != null ? cacheLoaderConfig.hashCode() : 0);
-      result = 29 * result + (syncCommitPhase ? 1 : 0);
-      result = 29 * result + (syncRollbackPhase ? 1 : 0);
-      result = 29 * result + (buddyReplicationConfig != null ? buddyReplicationConfig.hashCode() : 0);
-      result = 29 * result + (nodeLockingOptimistic ? 1 : 0);
-      result = 29 * result + (nodeLockingScheme != null ? nodeLockingScheme.hashCode() : 0);
-      result = 29 * result + (muxStackName != null ? muxStackName.hashCode() : 0);
-      result = 29 * result + (runtimeConfig != null ? runtimeConfig.hashCode() : 0);
-      result = 29 * result + (marshallerClass != null ? marshallerClass.hashCode() : 0);
-      result = 29 * result + (lockParentForChildInsertRemove ? 1 : 0);
+      int result;
+      result = (marshaller != null ? marshaller.hashCode() : 0);
+      result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0);
+      result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0);
+      result = 31 * result + (useReplQueue ? 1 : 0);
+      result = 31 * result + replQueueMaxElements;
+      result = 31 * result + (int) (replQueueInterval ^ (replQueueInterval >>> 32));
+      result = 31 * result + (exposeManagementStatistics ? 1 : 0);
+      result = 31 * result + (fetchInMemoryState ? 1 : 0);
+      result = 31 * result + (int) replicationVersion;
+      result = 31 * result + (int) (lockAcquisitionTimeout ^ (lockAcquisitionTimeout >>> 32));
+      result = 31 * result + (int) (syncReplTimeout ^ (syncReplTimeout >>> 32));
+      result = 31 * result + (cacheMode != null ? cacheMode.hashCode() : 0);
+      result = 31 * result + (inactiveOnStartup ? 1 : 0);
+      result = 31 * result + (int) (stateRetrievalTimeout ^ (stateRetrievalTimeout >>> 32));
+      result = 31 * result + (isolationLevel != null ? isolationLevel.hashCode() : 0);
+      result = 31 * result + (lockParentForChildInsertRemove ? 1 : 0);
+      result = 31 * result + (evictionConfig != null ? evictionConfig.hashCode() : 0);
+      result = 31 * result + (useRegionBasedMarshalling ? 1 : 0);
+      result = 31 * result + (transactionManagerLookupClass != null ? transactionManagerLookupClass.hashCode() : 0);
+      result = 31 * result + (cacheLoaderConfig != null ? cacheLoaderConfig.hashCode() : 0);
+      result = 31 * result + (syncCommitPhase ? 1 : 0);
+      result = 31 * result + (syncRollbackPhase ? 1 : 0);
+      result = 31 * result + (buddyReplicationConfig != null ? buddyReplicationConfig.hashCode() : 0);
+      result = 31 * result + (nodeLockingOptimistic ? 1 : 0);
+      result = 31 * result + (nodeLockingScheme != null ? nodeLockingScheme.hashCode() : 0);
+      result = 31 * result + (muxStackName != null ? muxStackName.hashCode() : 0);
+      result = 31 * result + (usingMultiplexer ? 1 : 0);
+      result = 31 * result + (runtimeConfig != null ? runtimeConfig.hashCode() : 0);
+      result = 31 * result + (marshallerClass != null ? marshallerClass.hashCode() : 0);
+      result = 31 * result + (shutdownHookBehavior != null ? shutdownHookBehavior.hashCode() : 0);
+      result = 31 * result + (useLazyDeserialization ? 1 : 0);
+      result = 31 * result + objectInputStreamPoolSize;
+      result = 31 * result + objectOutputStreamPoolSize;
       return result;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -8,6 +8,7 @@
 import org.jboss.cache.loader.CacheLoaderManager;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.VersionAwareMarshaller;
+import org.jboss.cache.marshall.io.ObjectStreamPool;
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.remoting.jgroups.CacheMessageListener;
 import org.jboss.cache.statetransfer.StateTransferManager;
@@ -21,7 +22,7 @@
  */
 @DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class, RegionManager.class, Notifier.class,
       CacheMessageListener.class, CacheLoaderManager.class, RemoteCacheInvocationDelegate.class, Marshaller.class,
-      InvocationContextContainer.class})
+      InvocationContextContainer.class, ObjectStreamPool.class})
 public class EmptyConstructorFactory extends ComponentFactory
 {
    @Override

Modified: core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -389,7 +389,7 @@
                   //                 Object marshalledNode = ois.readObject();
 
                   // deserialize result
-            	  Map<Object, Object> oldNode = (Map<Object, Object>) unmarshall(is); 
+                  Map<Object, Object> oldNode = (Map<Object, Object>) unmarshall(is);
                   return oldNode;
                }
                catch (Exception e)
@@ -454,7 +454,7 @@
          {
             // a hack to handles the incomp. of SQL server jdbc driver prior to SQL SERVER 2005
             if (driverName != null && (driverName.contains("SQLSERVER")
-                                       || driverName.contains("POSTGRESQL")))
+                  || driverName.contains("POSTGRESQL")))
             {
                ps.setNull(2, Types.LONGVARBINARY);
             }
@@ -535,7 +535,8 @@
 
          ps.setString(2, name.toString());
 
-         /*int rows = */ps.executeUpdate();
+         /*int rows = */
+         ps.executeUpdate();
          //         if (rows != 1)
          //         {
          //            throw new IllegalStateException("Expected one updated row but got " + rows);
@@ -704,7 +705,7 @@
 
    protected byte[] marshall(Object obj) throws Exception
    {
-      return getMarshaller().objectToByteBuffer(obj);
+      return getMarshaller().objectToByteBuffer(obj, true);
    }
 
    private static String toUpperCase(String s)
@@ -723,10 +724,11 @@
    {
 
       @Override
-	  public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+      public Set<java.util.Map.Entry<Object, Object>> entrySet()
+      {
          throw new UnsupportedOperationException();
-	  }
-   
+      }
+
    };
 
 }

Modified: core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -6,7 +6,7 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
 import org.jboss.cache.lock.StripedLock;
-import org.jboss.cache.marshall.ObjectSerializationFactory;
+import org.jboss.util.stream.MarshalledValueInputStream;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -502,7 +502,7 @@
    protected Object unmarshall(File from) throws Exception
    {
       FileInputStream fileIn = new FileInputStream(from);
-      ObjectInputStream input = ObjectSerializationFactory.createObjectInputStream(fileIn);
+      ObjectInputStream input = new MarshalledValueInputStream(fileIn);
       Object unmarshalledObj = getMarshaller().objectFromObjectStream(input);
       input.close();
       return unmarshalledObj;
@@ -511,7 +511,7 @@
    protected void marshall(Object obj, File to) throws Exception
    {
       FileOutputStream fileOut = new FileOutputStream(to);
-      ObjectOutputStream output = ObjectSerializationFactory.createObjectOutputStream(fileOut);
+      ObjectOutputStream output = new ObjectOutputStream(fileOut);
       getMarshaller().objectToObjectStream(obj, output);
       output.close();
    }

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -68,33 +68,22 @@
    // implement the basic contract set in RPcDispatcher.AbstractMarshaller
    public byte[] objectToByteBuffer(Object obj) throws Exception
    {
+      return objectToByteBuffer(obj, false);
+   }
+
+   public byte[] objectToByteBuffer(Object obj, boolean b) throws Exception
+   {
       throw new RuntimeException("Needs to be overridden!");
-//      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-//      ObjectOutputStream out = ObjectSerializationFactory.createObjectOutputStream(baos);
-//      objectToObjectStream(obj, out);
-//      out.close();
-//      return baos.toByteArray();
    }
 
    public Object objectFromByteBuffer(byte[] bytes) throws Exception
    {
       throw new RuntimeException("Needs to be overridden!");
-      //ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
-      //return objectFromObjectStream(in);
    }
 
    public Object objectFromStream(InputStream in) throws Exception
    {
       throw new RuntimeException("Needs to be overridden!");
-      // by default just create an OIS from this IS and pass in to the relevant method
-//      if (in instanceof ObjectInputStream)
-//      {
-//         return objectFromObjectStream((ObjectInputStream) in);
-//      }
-//      else
-//      {
-//         return objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
-//      }
    }
 
    /**

Deleted: core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/JavaObjectStreamFactory.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,38 +0,0 @@
-package org.jboss.cache.marshall;
-
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-
-/**
- * Standard Java implementation of ObjectStreamFactory
- *
- * @author Clebert Suconic
- * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
- * @since 1.4.1
- */
-class JavaObjectStreamFactory implements ObjectStreamFactory
-{
-
-   public ObjectInputStream createObjectInputStream(byte[] bytes) throws IOException
-   {
-      ByteArrayInputStream in = new ByteArrayInputStream(bytes);
-      return new MarshalledValueInputStream(in);
-   }
-
-   public ObjectInputStream createObjectInputStream(InputStream in) throws IOException
-   {
-      return new MarshalledValueInputStream(in);
-   }
-
-   public ObjectOutputStream createObjectOutputStream(OutputStream out) throws IOException
-   {
-      return new ObjectOutputStream(out);
-   }
-
-}

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -68,4 +68,14 @@
     * @throws Exception
     */
    void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception;
+
+   /**
+    * Same as {@link #objectToByteBuffer(Object)} except that you can optionally specify to write object stream headers.
+    * Useful if you intend to deserialize the stream with non-pooled input streams.
+    *
+    * @param o
+    * @param writeStreamHeaders
+    * @return
+    */
+   byte[] objectToByteBuffer(Object o, boolean writeStreamHeaders) throws Exception;
 }

Deleted: core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ObjectSerializationFactory.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,76 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.cache.marshall;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-
-/**
- * Factory class for creating object output and inut streams, to allow for multiple mechanisms of serialization.
- * Java serialization is the only supported mechanism at this point.
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
- */
-public class ObjectSerializationFactory
-{
-   static ObjectStreamFactory factory = new JavaObjectStreamFactory();
-
-   /*
-   static
-   {
-      // start with the NEW property
-      String propString = System.getProperty("jboss.serialization");
-      if (propString == null)
-      {
-         // and now check legacy
-         propString = System.getProperty("serialization.jboss");
-         if (propString != null)
-            log.info("The system property 'serialization.jboss' is deprecated and may be removed from future releases.  Please use 'jboss.serialization' instead.");
-      }      
-      useJBossSerialization = false; // default.
-      if (propString != null) useJBossSerialization = Boolean.valueOf(propString);
-
-      try
-      {
-         if (useJBossSerialization)
-         {
-            factory = (ObjectStreamFactory) Class.forName("org.jboss.cache.marshall.JBossObjectStreamFactory").newInstance();
-         }
-      }
-      catch (Exception e)
-      {
-         log.error("Unable to load JBossObjectStreamFactory.  Perhaps jboss-serialization jar not loaded?", e);
-         log.error("Falling back to java serialization.");
-      }
-   }
-   */
-
-   public static ObjectOutputStream createObjectOutputStream(OutputStream out) throws IOException
-   {
-      return factory.createObjectOutputStream(out);
-   }
-
-   public static ObjectInputStream createObjectInputStream(byte[] bytes) throws IOException
-   {
-      return factory.createObjectInputStream(bytes);
-   }
-
-   public static ObjectInputStream createObjectInputStream(InputStream in) throws IOException
-   {
-      return factory.createObjectInputStream(in);
-   }
-
-//   public static boolean useJBossSerialization()
-//   {
-//      return useJBossSerialization;
-//   }
-}

Deleted: core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ObjectStreamFactory.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,23 +0,0 @@
-package org.jboss.cache.marshall;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-
-/**
- * ObjectStreamFactory
- *
- * @author Clebert Suconic
- * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
- * @since 1.4.1
- */
-public interface ObjectStreamFactory
-{
-   public ObjectOutputStream createObjectOutputStream(OutputStream out) throws IOException;
-
-   public ObjectInputStream createObjectInputStream(byte[] bytes) throws IOException;
-
-   public ObjectInputStream createObjectInputStream(InputStream in) throws IOException;
-}

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -8,7 +8,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.annotations.Inject;
@@ -16,7 +15,10 @@
 import org.jboss.cache.marshall.io.ObjectStreamPool;
 import org.jboss.cache.marshall.io.ReusableObjectOutputStream;
 import org.jboss.cache.util.Util;
+import org.jboss.util.stream.MarshalledValueInputStream;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -48,9 +50,10 @@
    ObjectStreamPool pool;
 
    @Inject
-   void injectComponentRegistry(ComponentRegistry componentRegistry)
+   void injectComponents(ComponentRegistry componentRegistry, ObjectStreamPool pool)
    {
       this.componentRegistry = componentRegistry;
+      this.pool = pool;
    }
 
    @Start
@@ -99,15 +102,6 @@
          log.debug("Started with version " + replVersionString + " and versionInt " + versionInt);
          log.debug("Using default marshaller class " + this.defaultMarshaller.getClass());
       }
-
-      try
-      {
-         pool = new ObjectStreamPool(25, 25);
-      }
-      catch (Exception e)
-      {
-         throw new CacheException(e);
-      }
    }
 
    protected int getCustomMarshallerVersionInt()
@@ -158,9 +152,29 @@
       }
    }
 
-   public byte[] objectToByteBuffer(Object obj) throws Exception
+   private byte[] useNonPooledStream(Object obj) throws Exception
    {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream out = new ObjectOutputStream(baos);
+
+      out.writeShort(versionInt);
+      if (trace) log.trace("Wrote version " + versionInt);
+
+      //now marshall the contents of the object
+      defaultMarshaller.objectToObjectStream(obj, out);
+      out.close();
+
+      // and return bytes.
+      return baos.toByteArray();
+   }
+
+   @Override
+   public byte[] objectToByteBuffer(Object obj, boolean writeHeader) throws Exception
+   {
+      if (writeHeader) return useNonPooledStream(obj);
+
       ReusableObjectOutputStream out = pool.getOutputStream();
+
       try
       {
          out.writeShort(versionInt);
@@ -178,6 +192,7 @@
       }
    }
 
+   @Override
    public Object objectFromByteBuffer(byte[] buf) throws Exception
    {
       Marshaller marshaller;
@@ -209,10 +224,35 @@
 
    public Object objectFromStream(InputStream is) throws Exception
    {
-      int avbl = is.available();
-      byte[] bytes = new byte[avbl];
-      is.read(bytes, 0, avbl);
-      return objectFromByteBuffer(bytes);
+      if (is instanceof ByteArrayInputStream)
+      {
+         int avbl = is.available();
+         byte[] bytes = new byte[avbl];
+         is.read(bytes, 0, avbl);
+         return objectFromByteBuffer(bytes);
+      }
+      else
+      {
+         // actually attempt to "stream" this stuff.  We need to revert to an old-fashioned Object Input Stream since
+         // we don't have a reusable implementation for non-byte-backed streams as yet.
+         short versionId;
+         Marshaller marshaller;
+         ObjectInputStream in = new MarshalledValueInputStream(is);
+         try
+         {
+            versionId = in.readShort();
+            if (trace) log.trace("Read version " + versionId);
+         }
+         catch (Exception e)
+         {
+            log.error("Unable to read version id from first two bytes of stream, barfing.");
+            throw e;
+         }
+
+         marshaller = getMarshaller(versionId);
+
+         return marshaller.objectFromObjectStream(in);
+      }
    }
 
    public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -1,5 +1,11 @@
 package org.jboss.cache.marshall.io;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.concurrent.BlockingQueue;
@@ -15,22 +21,48 @@
 {
    BlockingQueue<ReusableObjectOutputStream> outputStreams;
    BlockingQueue<ReusableObjectInputStream> inputStreams;
+   int numOutputStreams = 1, numInputStreams = 1;
+   Configuration configuration;
+   Log log = LogFactory.getLog(ObjectStreamPool.class);
 
-   /**
-    * Constructs the pool with a fixed number of object output and input streams
-    *
-    * @param numOutputStreams number of object output streams
-    * @param numInputStreams  number of object input streams
-    * @throws InterruptedException
-    * @throws IOException
-    */
-   public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws InterruptedException, IOException
+   public ObjectStreamPool()
    {
+      // for construction by the IOC framework
+   }
+
+   @Inject
+   public void injectDependencies(Configuration configuration)
+   {
+      this.configuration = configuration;
+   }
+
+   @Start
+   public void start() throws InterruptedException, IOException
+   {
+      if (configuration != null)
+      {
+         numInputStreams = configuration.getObjectInputStreamPoolSize();
+         numOutputStreams = configuration.getObjectOutputStreamPoolSize();
+         if (log.isDebugEnabled())
+            log.debug("Using stream sizes from configuration as input: " + numInputStreams + " and output: " + numOutputStreams);
+      }
+
       outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
       inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
 
+      long start = 0;
+      if (log.isDebugEnabled())
+      {
+         log.debug("Creating " + numOutputStreams + " ReusableObjectOutputStreams and " + numInputStreams + " ReusableObjectInputStreams for pool");
+         start = System.currentTimeMillis();
+      }
       for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new ReusableObjectOutputStream());
       for (int i = 0; i < numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
+      if (log.isDebugEnabled())
+      {
+         log.debug("Initialised object stream pool in " + (System.currentTimeMillis() - start) + " millis");
+      }
+
    }
 
    /**

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/VersionAwareMarshallerTest.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -53,22 +53,28 @@
 
    public void testVersionHeaderDefaultCurrent() throws Exception
    {
+      ObjectStreamPool pool = new ObjectStreamPool();
+      pool.start();
+
       VersionAwareMarshaller marshaller = createVAM(Version.getVersionString(Version.getVersionShort()));
+      marshaller.injectComponents(null, pool);
+
       byte[] bytes = marshaller.objectToByteBuffer("Hello");
 
-      ObjectStreamPool pool = new ObjectStreamPool(1, 1);
-
       ObjectInputStream in = pool.getInputStream(bytes);
       assertEquals("Version header short should be '21'", 21, in.readShort());
    }
 
    public void testVersionHeader200() throws Exception
    {
+      ObjectStreamPool pool = new ObjectStreamPool();
+      pool.start();
+
       VersionAwareMarshaller marshaller = createVAM("2.0.0.GA");
+      marshaller.injectComponents(null, pool);
+
       byte[] bytes = marshaller.objectToByteBuffer("Hello");
 
-      ObjectStreamPool pool = new ObjectStreamPool(1, 1);
-
       ObjectInputStream in = pool.getInputStream(bytes);
       assertEquals("Version header short should be '20'", 20, in.readShort());
    }

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java	2008-02-06 13:14:45 UTC (rev 5315)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java	2008-02-06 14:20:40 UTC (rev 5316)
@@ -24,7 +24,10 @@
    @BeforeTest
    public void setUp() throws IOException, InterruptedException
    {
-      pool = new ObjectStreamPool(1, 1);
+      pool = new ObjectStreamPool();
+      pool.numInputStreams = 1;
+      pool.numOutputStreams = 1;
+      pool.start();
    }
 
    @AfterMethod
@@ -240,6 +243,22 @@
       assert ois.readShort() == 6;
    }
 
+   public void testLargeObject() throws Exception
+   {
+      oos = pool.getOutputStream();
+      StringBuilder builder = new StringBuilder();
+      int targetSize = 1 << 22; // 4MB  !!
+      System.out.println("target size: " + targetSize);
+      for (int i = 0; i < targetSize; i++) builder.append("X");
+      String fourMegString = builder.toString();
+      oos.writeObject(fourMegString);
+      byte[] asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+
+      assert ois.readObject().equals(fourMegString);
+   }
+
    public void testReusableOutputStreamArraySize() throws InterruptedException, IOException
    {
       oos = pool.getOutputStream();




More information about the jbosscache-commits mailing list