[infinispan-commits] Infinispan SVN: r1932 - trunk/lucene-directory/src/main/java/org/infinispan/lucene.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jun 24 20:18:59 EDT 2010


Author: sannegrinovero
Date: 2010-06-24 20:18:59 -0400 (Thu, 24 Jun 2010)
New Revision: 1932

Added:
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
Removed:
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java
Modified:
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
Log:
[ISPN-510] (Simplify InfinispanIndexIO) - trunk


Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2010-06-24 17:12:16 UTC (rev 1931)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2010-06-25 00:18:59 UTC (rev 1932)
@@ -48,8 +48,12 @@
  * @author Sanne Grinovero
  * @see org.infinispan.lucene.locking.TransactionalLockFactory
  */
-// todo add support for ConcurrentMergeSheduler
+// TODO add support for ConcurrentMergeSheduler
 public class InfinispanDirectory extends Directory {
+   
+   // used as default chunk size if not provided in conf
+   // each Lucene index segment is splitted into parts with default size defined here
+   public final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
 
    private static final Log log = LogFactory.getLog(InfinispanDirectory.class);
 
@@ -75,7 +79,7 @@
    }
 
    public InfinispanDirectory(Cache<CacheKey, Object> cache, String indexName, LockFactory lf) {
-      this(cache, indexName, lf, InfinispanIndexIO.DEFAULT_BUFFER_SIZE);
+      this(cache, indexName, lf, DEFAULT_BUFFER_SIZE);
    }
 
    public InfinispanDirectory(Cache<CacheKey, Object> cache, String indexName, int chunkSize) {
@@ -83,7 +87,7 @@
    }
 
    public InfinispanDirectory(Cache<CacheKey, Object> cache, String indexName) {
-      this(cache, indexName, new BaseLockFactory(cache, indexName), InfinispanIndexIO.DEFAULT_BUFFER_SIZE);
+      this(cache, indexName, new BaseLockFactory(cache, indexName), DEFAULT_BUFFER_SIZE);
    }
 
    public InfinispanDirectory(Cache<CacheKey, Object> cache) {
@@ -215,9 +219,9 @@
          Set<String> fileList = getFileList();
          fileList.add(name);
          cache.put(fileListCacheKey, fileList);
-         return new InfinispanIndexIO.InfinispanIndexOutput(cache, key, chunkSize, newFileMetadata);
+         return new InfinispanIndexOutput(cache, key, chunkSize, newFileMetadata);
       } else {
-         return new InfinispanIndexIO.InfinispanIndexOutput(cache, key, chunkSize, previous);
+         return new InfinispanIndexOutput(cache, key, chunkSize, previous);
       }
    }
 
@@ -234,7 +238,7 @@
     */
    public IndexInput openInput(String name) throws IOException {
       final FileCacheKey fileKey = new FileCacheKey(indexName, name);
-      return new InfinispanIndexIO.InfinispanIndexInput(cache, fileKey, chunkSize);
+      return new InfinispanIndexInput(cache, fileKey, chunkSize);
    }
 
    /**

Deleted: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java	2010-06-24 17:12:16 UTC (rev 1931)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java	2010-06-25 00:18:59 UTC (rev 1932)
@@ -1,299 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.lucene;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.infinispan.AdvancedCache;
-import org.infinispan.context.Flag;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-/**
- * Deal with input-output operations on Infinispan based Directory
- * 
- * @since 4.0
- * @author Lukasz Moren
- * @author Davide Di Somma
- * @see org.apache.lucene.store.Directory
- * @see org.apache.lucene.store.IndexInput
- * @see org.apache.lucene.store.IndexOutput
- */
-public class InfinispanIndexIO {
-
-   // used as default chunk size if not provided in conf
-   // each Lucene index segment is splitted into parts with default size defined here
-   public final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
-
-   private static byte[] getChunkFromPosition(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int pos, int bufferSize) {
-      CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), getChunkNumberFromPosition(pos,
-               bufferSize));
-      return (byte[]) cache.withFlags(Flag.SKIP_LOCKING).get(key);
-   }
-   
-   public static byte[] getChunkFromPosition(ConcurrentHashMap<CacheKey, Object> localCache, FileCacheKey fileKey, int pos, int bufferSize) {
-      CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), getChunkNumberFromPosition(pos,
-               bufferSize));
-      return (byte[]) localCache.get(key);
-   }
-
-   private static int getPositionInBuffer(int pos, int bufferSize) {
-      return (pos % bufferSize);
-   }
-
-   private static int getChunkNumberFromPosition(int pos, int bufferSize) {
-      return ((pos) / (bufferSize));
-   }
-
-   /**
-    * Responsible for writing into <code>Directory</code>
-    */
-   public static class InfinispanIndexInput extends IndexInput {
-
-      private static final Log log = LogFactory.getLog(InfinispanIndexInput.class);
-
-      private final AdvancedCache<CacheKey, Object> cache;
-      private final FileMetadata file;
-      private final FileCacheKey fileKey;
-      private final int chunkSize;
-
-      private int currentBufferSize;
-      private byte[] buffer;
-      private int bufferPosition;
-      private int currentLoadedChunk = -1;
-
-      public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey) throws IOException {
-         this(cache, fileKey, InfinispanIndexIO.DEFAULT_BUFFER_SIZE);
-      }
-
-      public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int chunkSize) throws IOException {
-         this.cache = cache;
-         this.fileKey = fileKey;
-         this.chunkSize = chunkSize;
-
-         // get file header from file
-         this.file = (FileMetadata) cache.get(fileKey);
-
-         if (file == null) {
-            throw new IOException("Error loading medatada for index file: " + fileKey);
-         }
-
-         if (log.isDebugEnabled()) {
-            log.debug("Opened new IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
-         }
-      }
-
-      @Override
-      public byte readByte() throws IOException {
-         if (bufferPosition >= currentBufferSize) {
-            nextChunk();
-            bufferPosition = 0;
-         }
-         return buffer[bufferPosition++];
-       }
-      
-      @Override
-      public void readBytes(byte[] b, int offset, int len) throws IOException {
-         int bytesToRead = len;
-         if (buffer == null) {
-            nextChunk();
-         }
-         while (bytesToRead > 0) {
-            int bytesToCopy = Math.min(currentBufferSize - bufferPosition, bytesToRead);
-            System.arraycopy(buffer, bufferPosition, b, offset, bytesToCopy);
-            offset += bytesToCopy;
-            bytesToRead -= bytesToCopy;
-            bufferPosition += bytesToCopy;
-            if (bufferPosition >= currentBufferSize && bytesToRead > 0) {
-               nextChunk();
-               bufferPosition = 0;
-            }
-         }
-      }
-
-      @Override
-      public void close() throws IOException {
-         currentBufferSize = 0;
-         bufferPosition = 0;
-         currentLoadedChunk = -1;
-         buffer = null;
-         if (log.isDebugEnabled()) {
-            log.debug("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
-         }
-      }
-
-      public long getFilePointer() {
-         return ((long)currentLoadedChunk) * chunkSize + bufferPosition;
-      }
-
-      @Override
-      public void seek(long pos) throws IOException {
-         bufferPosition = (int)( pos % chunkSize );
-         int targetChunk = (int) (pos / chunkSize);
-         if (targetChunk != currentLoadedChunk) {
-            currentLoadedChunk = targetChunk;
-            setBufferToCurrentChunk();
-         }
-      }
-      
-      private void nextChunk() throws IOException {
-         currentLoadedChunk++;
-         setBufferToCurrentChunk();
-      }
-
-      private void setBufferToCurrentChunk() throws IOException {
-         CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
-         buffer = (byte[]) cache.get(key);
-         if (buffer == null) {
-            throw new IOException("Chunk value could not be found for key " + key);
-         }
-         currentBufferSize = buffer.length;
-      }
-
-      @Override
-      public long length() {
-         return file.getSize();
-      }
-      
-   }
-
-   /**
-    * Responsible for reading from <code>Directory</code>
-    */
-   public static class InfinispanIndexOutput extends IndexOutput {
-
-      private static final Log log = LogFactory.getLog(InfinispanIndexOutput.class);
-
-      private final int bufferSize;
-
-      private final AdvancedCache<CacheKey, Object> cache;
-      private final FileMetadata file;
-      private final FileCacheKey fileKey;
-
-      private byte[] buffer;
-      private int bufferPosition = 0;
-      private int filePosition = 0;
-      private int chunkNumber;
-
-      public InfinispanIndexOutput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int bufferSize, FileMetadata fileMetadata) throws IOException {
-         this.cache = cache;
-         this.fileKey = fileKey;
-         this.bufferSize = bufferSize;
-         this.buffer = new byte[this.bufferSize];
-         this.file = fileMetadata;
-         if (log.isDebugEnabled()) {
-            log.debug("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
-         }
-      }
-
-      private void newChunk() throws IOException {
-         flush();// save data first
-
-         // check if we have to create new chunk, or get already existing in cache for modification
-         if ((buffer = getChunkFromPosition(cache, fileKey, filePosition, bufferSize)) == null) {
-            buffer = new byte[bufferSize];
-         }
-         bufferPosition = 0;
-      }
-
-      public void writeByte(byte b) throws IOException {
-         if (isNewChunkNeeded()) {
-            newChunk();
-         }
-         buffer[bufferPosition++] = b;
-         filePosition++;
-      }
-
-      public void writeBytes(byte[] b, int offset, int length) throws IOException {
-
-         int writedBytes = 0;
-         while (writedBytes < length) {
-            int pieceLength = Math.min(buffer.length - bufferPosition, length - writedBytes);
-            System.arraycopy(b, offset + writedBytes, buffer, bufferPosition, pieceLength);
-            bufferPosition += pieceLength;
-            filePosition += pieceLength;
-            writedBytes += pieceLength;
-            if (isNewChunkNeeded()) {
-               newChunk();
-            }
-         }
-      }
-
-      private boolean isNewChunkNeeded() {
-         return (bufferPosition == buffer.length);
-      }
-
-      public void flush() throws IOException {
-         // select right chunkNumber
-         chunkNumber = getChunkNumberFromPosition(filePosition - 1, bufferSize);
-         // and create distinct key for it
-         ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), chunkNumber);
-         // size changed, apply change to file header
-         file.touch();
-         if (file.getSize() < filePosition) {
-            file.setSize(filePosition);
-         }
-         cache.startBatch();
-         // add chunk to cache
-         cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(key, buffer);
-         // override existing file header with new size and last time access
-         cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(fileKey, file);
-         cache.endBatch(true);
-      }
-
-      public void close() throws IOException {
-         flush();
-         bufferPosition = 0;
-         filePosition = 0;
-         buffer = null;
-         if (log.isDebugEnabled()) {
-            log.debug("Closed IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
-         }
-         // cache.compact(); //TODO investigate about this
-      }
-
-      public long getFilePointer() {
-         return filePosition;
-      }
-
-      public void seek(long pos) throws IOException {
-         flush();
-
-         if (pos > file.getSize()) {
-            throw new IOException(fileKey.getFileName() + ": seeking past of the file");
-         }
-
-         buffer = getChunkFromPosition(cache, fileKey, (int) pos, bufferSize);
-         bufferPosition = getPositionInBuffer((int) pos, bufferSize);
-         filePosition = (int) pos;
-      }
-
-      public long length() throws IOException {
-         return file.getSize();
-      }
-
-   }
-
-}

Added: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	                        (rev 0)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-06-25 00:18:59 UTC (rev 1932)
@@ -0,0 +1,143 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+import org.infinispan.AdvancedCache;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * Responsible for reading from <code>InfinispanDirectory</code>
+ * 
+ * @since 4.0
+ * @author Sanne Grinovero
+ * @author Davide Di Somma
+ * @see org.apache.lucene.store.Directory
+ * @see org.apache.lucene.store.IndexInput
+ */
+public class InfinispanIndexInput extends IndexInput {
+
+   private static final Log log = LogFactory.getLog(InfinispanIndexInput.class);
+
+   private final AdvancedCache<CacheKey, Object> cache;
+   private final FileMetadata file;
+   private final FileCacheKey fileKey;
+   private final int chunkSize;
+
+   private int currentBufferSize;
+   private byte[] buffer;
+   private int bufferPosition;
+   private int currentLoadedChunk = -1;
+
+   public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int chunkSize) throws IOException {
+      this.cache = cache;
+      this.fileKey = fileKey;
+      this.chunkSize = chunkSize;
+
+      // get file header from file
+      this.file = (FileMetadata) cache.get(fileKey);
+
+      if (file == null) {
+         throw new IOException("Error loading medatada for index file: " + fileKey);
+      }
+
+      if (log.isDebugEnabled()) {
+         log.debug("Opened new IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      }
+   }
+
+   @Override
+   public byte readByte() throws IOException {
+      if (bufferPosition >= currentBufferSize) {
+         nextChunk();
+         bufferPosition = 0;
+      }
+      return buffer[bufferPosition++];
+    }
+   
+   @Override
+   public void readBytes(byte[] b, int offset, int len) throws IOException {
+      int bytesToRead = len;
+      if (buffer == null) {
+         nextChunk();
+      }
+      while (bytesToRead > 0) {
+         int bytesToCopy = Math.min(currentBufferSize - bufferPosition, bytesToRead);
+         System.arraycopy(buffer, bufferPosition, b, offset, bytesToCopy);
+         offset += bytesToCopy;
+         bytesToRead -= bytesToCopy;
+         bufferPosition += bytesToCopy;
+         if (bufferPosition >= currentBufferSize && bytesToRead > 0) {
+            nextChunk();
+            bufferPosition = 0;
+         }
+      }
+   }
+
+   @Override
+   public void close() throws IOException {
+      currentBufferSize = 0;
+      bufferPosition = 0;
+      currentLoadedChunk = -1;
+      buffer = null;
+      if (log.isDebugEnabled()) {
+         log.debug("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      }
+   }
+
+   public long getFilePointer() {
+      return ((long)currentLoadedChunk) * chunkSize + bufferPosition;
+   }
+
+   @Override
+   public void seek(long pos) throws IOException {
+      bufferPosition = (int)( pos % chunkSize );
+      int targetChunk = (int) (pos / chunkSize);
+      if (targetChunk != currentLoadedChunk) {
+         currentLoadedChunk = targetChunk;
+         setBufferToCurrentChunk();
+      }
+   }
+   
+   private void nextChunk() throws IOException {
+      currentLoadedChunk++;
+      setBufferToCurrentChunk();
+   }
+
+   private void setBufferToCurrentChunk() throws IOException {
+      CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+      buffer = (byte[]) cache.get(key);
+      if (buffer == null) {
+         throw new IOException("Chunk value could not be found for key " + key);
+      }
+      currentBufferSize = buffer.length;
+   }
+
+   @Override
+   public long length() {
+      return file.getSize();
+   }
+   
+}
\ No newline at end of file

Added: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	                        (rev 0)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-06-25 00:18:59 UTC (rev 1932)
@@ -0,0 +1,162 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexOutput;
+import org.infinispan.AdvancedCache;
+import org.infinispan.context.Flag;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * Responsible for writing to a <code>Directory</code>
+ * 
+ * @since 4.0
+ * @author Lukasz Moren
+ * @author Davide Di Somma
+ * @see org.apache.lucene.store.Directory
+ * @see org.apache.lucene.store.IndexInput
+ */
+public class InfinispanIndexOutput extends IndexOutput {
+
+   private static final Log log = LogFactory.getLog(InfinispanIndexOutput.class);
+
+   private final int bufferSize;
+
+   private final AdvancedCache<CacheKey, Object> cache;
+   private final FileMetadata file;
+   private final FileCacheKey fileKey;
+
+   private byte[] buffer;
+   private int bufferPosition = 0;
+   private int filePosition = 0;
+   private int chunkNumber;
+
+   public InfinispanIndexOutput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int bufferSize, FileMetadata fileMetadata) throws IOException {
+      this.cache = cache;
+      this.fileKey = fileKey;
+      this.bufferSize = bufferSize;
+      this.buffer = new byte[this.bufferSize];
+      this.file = fileMetadata;
+      if (log.isDebugEnabled()) {
+         log.debug("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      }
+   }
+   
+   private static byte[] getChunkFromPosition(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int pos, int bufferSize) {
+      CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), getChunkNumberFromPosition(pos, bufferSize));
+      return (byte[]) cache.withFlags(Flag.SKIP_LOCKING).get(key);
+   }
+   
+   private static int getPositionInBuffer(int pos, int bufferSize) {
+      return (pos % bufferSize);
+   }
+
+   private static int getChunkNumberFromPosition(int pos, int bufferSize) {
+      return ((pos) / (bufferSize));
+   }
+
+   private void newChunk() throws IOException {
+      flush();// save data first
+      // check if we have to create new chunk, or get already existing in cache for modification
+      if ((buffer = getChunkFromPosition(cache, fileKey, filePosition, bufferSize)) == null) {
+         buffer = new byte[bufferSize];
+      }
+      bufferPosition = 0;
+   }
+
+   public void writeByte(byte b) throws IOException {
+      if (isNewChunkNeeded()) {
+         newChunk();
+      }
+      buffer[bufferPosition++] = b;
+      filePosition++;
+   }
+
+   public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      int writedBytes = 0;
+      while (writedBytes < length) {
+         int pieceLength = Math.min(buffer.length - bufferPosition, length - writedBytes);
+         System.arraycopy(b, offset + writedBytes, buffer, bufferPosition, pieceLength);
+         bufferPosition += pieceLength;
+         filePosition += pieceLength;
+         writedBytes += pieceLength;
+         if (isNewChunkNeeded()) {
+            newChunk();
+         }
+      }
+   }
+
+   private boolean isNewChunkNeeded() {
+      return (bufferPosition == buffer.length);
+   }
+
+   public void flush() throws IOException {
+      // select right chunkNumber
+      chunkNumber = getChunkNumberFromPosition(filePosition - 1, bufferSize);
+      // and create distinct key for it
+      ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), chunkNumber);
+      // size changed, apply change to file header
+      file.touch();
+      if (file.getSize() < filePosition) {
+         file.setSize(filePosition);
+      }
+      cache.startBatch();
+      // add chunk to cache
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(key, buffer);
+      // override existing file header with new size and last time access
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(fileKey, file);
+      cache.endBatch(true);
+   }
+
+   public void close() throws IOException {
+      flush();
+      bufferPosition = 0;
+      filePosition = 0;
+      buffer = null;
+      if (log.isDebugEnabled()) {
+         log.debug("Closed IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      }
+      // cache.compact(); //TODO investigate about this
+   }
+
+   public long getFilePointer() {
+      return filePosition;
+   }
+
+   public void seek(long pos) throws IOException {
+      flush();
+      if (pos > file.getSize()) {
+         throw new IOException(fileKey.getFileName() + ": seeking past of the file");
+      }
+      buffer = getChunkFromPosition(cache, fileKey, (int) pos, bufferSize);
+      bufferPosition = getPositionInBuffer((int) pos, bufferSize);
+      filePosition = (int) pos;
+   }
+
+   public long length() throws IOException {
+      return file.getSize();
+   }
+
+}
\ No newline at end of file



More information about the infinispan-commits mailing list