[exo-jcr-commits] exo-jcr SVN: r5451 - jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 13 10:23:36 EST 2012


Author: nzamosenchuk
Date: 2012-01-13 10:23:33 -0500 (Fri, 13 Jan 2012)
New Revision: 5451

Added:
   jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java
Modified:
   jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
   jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java
Log:
EXOJCR-1709 : initial implementation based on PoC. 
What is ready :
- initial code-architecture;
- index sync, when index list changes;
- configuration;

TODO:
- Sync index on start to get initial index;
- Failover capabilities;
- User/password for RSync server;

RSync approach is built upon JBossCache concept, overriding just few things that adds RSync invocation when processing an index list modification event

Modified: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
===================================================================
--- jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java	2012-01-13 09:39:01 UTC (rev 5450)
+++ jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java	2012-01-13 15:23:33 UTC (rev 5451)
@@ -29,6 +29,8 @@
 import org.exoplatform.services.jcr.impl.core.query.IndexingTree;
 import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
 import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import org.exoplatform.services.jcr.impl.core.query.lucene.IndexInfos;
+import org.exoplatform.services.jcr.impl.core.query.lucene.SearchIndex;
 import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory;
 import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory.CacheType;
 import org.exoplatform.services.jcr.jbosscache.PrivilegedJBossCacheHelper;
@@ -75,10 +77,25 @@
 
    public static final Boolean PARAM_JBOSSCACHE_SHAREABLE_DEFAULT = Boolean.FALSE;
 
+   // RSYNC SERVER CONFIGURATION
+   public static final String PARAM_RSYNC_ENTRY_NAME = "rsync-entry-name";
+
+   public static final String PARAM_RSYNC_ENTRY_PATH = "rsync-entry-path";
+
+   public static final String PARAM_RSYNC_PORT = "rsync-port";
+
+   public static final int PARAM_RSYNC_PORT_DEFAULT = 873;
+
+   // TODO
+   public static final String PARAM_RSYNC_USER = "rsync-user";
+
+   public static final String PARAM_RSYNC_PASSWORD = "rsync-password";
+
+   // Fields
    private final Cache<Serializable, Object> cache;
 
    private final Fqn<String> rootFqn;
-   
+
    private final JmxRegistrationManager jmxManager;
 
    public static final String LISTWRAPPER = "$lists".intern();
@@ -140,8 +157,10 @@
 
       PrivilegedJBossCacheHelper.create(cache);
       PrivilegedJBossCacheHelper.start(cache);
-      
-      this.jmxManager = ExoJBossCacheFactory.getJmxRegistrationManager(searchManager.getExoContainerContext(), cache, CacheType.INDEX_CACHE);
+
+      this.jmxManager =
+         ExoJBossCacheFactory.getJmxRegistrationManager(searchManager.getExoContainerContext(), cache,
+            CacheType.INDEX_CACHE);
       if (jmxManager != null)
       {
          SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
@@ -171,18 +190,50 @@
 
       if (!parentHandler.isInitialized())
       {
-         parentHandler.setIndexInfos(new JBossCacheIndexInfos(rootFqn, cache, true, modeHandler));
+         parentHandler.setIndexInfos(createIndexInfos(true, modeHandler, config, parentHandler));
          parentHandler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(rootFqn, cache, true, modeHandler));
          parentHandler.init();
       }
       if (!handler.isInitialized())
       {
-         handler.setIndexInfos(new JBossCacheIndexInfos(rootFqn, cache, false, modeHandler));
+         handler.setIndexInfos(createIndexInfos(false, modeHandler, config, handler));
          handler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(rootFqn, cache, false, modeHandler));
          handler.init();
       }
    }
 
+   /**
+    * Factory method for creating corresponding IndexInfos class. RSyncIndexInfos created if RSync configured
+    * and JBossCacheIndexInfos otherwise
+    * 
+    * @param system
+    * @param modeHandler
+    * @param config
+    * @param handler
+    * @return
+    * @throws RepositoryConfigurationException
+    */
+   private IndexInfos createIndexInfos(Boolean system, IndexerIoModeHandler modeHandler, QueryHandlerEntry config,
+      QueryHandler handler) throws RepositoryConfigurationException
+   {
+      // read RSYNC configuration
+      String rsyncEntryName = config.getParameterValue(PARAM_RSYNC_ENTRY_NAME, null);
+      String rsyncEntryPath = config.getParameterValue(PARAM_RSYNC_ENTRY_PATH, null);
+      int rsyncPort = config.getParameterInteger(PARAM_RSYNC_PORT, PARAM_RSYNC_PORT_DEFAULT);
+
+      // rsync configured
+      if (rsyncEntryName != null)
+      {
+         return new RsyncIndexInfos(rootFqn, cache, system, modeHandler, ((SearchIndex)handler).getContext()
+            .getIndexDirectory(), rsyncPort, rsyncEntryName, rsyncEntryPath);
+      }
+      else
+      {
+         return new JBossCacheIndexInfos(rootFqn, cache, system, modeHandler);
+      }
+
+   }
+
    protected Log getLogger()
    {
       return log;
@@ -202,9 +253,9 @@
       return true;
    }
 
-    /**
-    * {@inheritDoc}
-    */
+   /**
+   * {@inheritDoc}
+   */
    @Override
    public void close()
    {
@@ -222,8 +273,9 @@
             });
          }
       }
-      catch (Exception e) {
+      catch (Exception e)
+      {
          log.warn("Not all JBoss Cache MBeans were unregistered.");
       }
-   } 
+   }
 }

Modified: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java
===================================================================
--- jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java	2012-01-13 09:39:01 UTC (rev 5450)
+++ jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java	2012-01-13 15:23:33 UTC (rev 5451)
@@ -64,17 +64,17 @@
 
    private static final String LIST_KEY = "$listOfIndexes".intern();
 
-   private final Cache<Serializable, Object> cache;
+   protected final Cache<Serializable, Object> cache;
 
    /**
     * Flag notifies if this IndexInfos is from system search manager or not.
     */
-   private boolean system;
+   protected boolean system;
 
    /**
     * Used to retrieve the current mode
     */
-   private final IndexerIoModeHandler modeHandler;
+   protected final IndexerIoModeHandler modeHandler;
 
    /**
     * This FQN points to cache node, where list of indexes for this {@link IndexInfos} instance is stored.
@@ -192,23 +192,36 @@
             // read from cache to update lists
             set = (Set<String>)cache.get(namesFqn, LIST_KEY);
          }
-         if (set != null)
+         refreshIndexes(set);
+      }
+   }
+
+   /**
+    * Update index configuration, when it changes on persistent storage 
+    * 
+    * @param set
+    */
+   protected void refreshIndexes(Set<String> set)
+   {
+      // do nothing if null is passed
+      if (set == null)
+      {
+         return;
+      }
+      setNames(set);
+      // callback multiIndex to refresh lists
+      try
+      {
+         MultiIndex multiIndex = getMultiIndex();
+         if (multiIndex != null)
          {
-            setNames(set);
-            // callback multiIndex to refresh lists
-            try
-            {
-               MultiIndex multiIndex = getMultiIndex();
-               if (multiIndex != null)
-               {
-                  multiIndex.refreshIndexList();
-               }
-            }
-            catch (IOException e)
-            {
-               log.error("Failed to update indexes! " + e.getMessage(), e);
-            }
+            multiIndex.refreshIndexList();
          }
       }
+      catch (IOException e)
+      {
+         log.error("Failed to update indexes! " + e.getMessage(), e);
+      }
    }
+
 }

Added: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java
===================================================================
--- jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java	                        (rev 0)
+++ jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java	2012-01-13 15:23:33 UTC (rev 5451)
@@ -0,0 +1,186 @@
+/*
+ * Copyright (C) 2003-2012 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache;
+
+import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoMode;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jgroups.stack.IpAddress;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Created by The eXo Platform SAS
+ * Author : eXoPlatform
+ *          exo at exoplatform.com
+ * Jan 11, 2012  
+ */
+ at CacheListener
+public class RsyncIndexInfos extends JBossCacheIndexInfos
+{
+
+   private final Log log = ExoLogger.getLogger("exo.jcr.component.core.RsyncIndexInfos");
+
+   private final String indexPath;
+
+   private final String urlFormatString;
+
+   public RsyncIndexInfos(Fqn<String> rootFqn, Cache<Serializable, Object> cache, boolean system,
+      IndexerIoModeHandler modeHandler, String indexPath, int rsyncPort, String rsyncEntryName, String rsyncEntryPath)
+      throws RepositoryConfigurationException
+   {
+      super(rootFqn, cache, system, modeHandler);
+
+      String absoluteRsyncEntryPath;
+      try
+      {
+         this.indexPath = new File(indexPath).getCanonicalPath();
+         absoluteRsyncEntryPath = new File(rsyncEntryPath).getCanonicalPath();
+      }
+      catch (IOException e)
+      {
+         throw new RepositoryConfigurationException("Index path or rsyncEntry path is invalid.", e);
+      }
+
+      if (this.indexPath.startsWith(absoluteRsyncEntryPath))
+      {
+         // in relation to RSync Server Entry
+         // i.e. absolute index path is /var/portal/data/index/repo1/ws2
+         // i.e. RSync Server Entry is "index" pointing to /var/portal/data/index
+         // then relative path is repo1/ws2
+         // and whole url is "rsync://<addr>:<port>/<entryName>/repo1/ws2"
+         String relativeIndexPath = this.indexPath.substring(absoluteRsyncEntryPath.length());
+
+         // if client is Windows OS, need to replace all '\' with '/' used in url
+         if (File.separatorChar == '\\')
+         {
+            relativeIndexPath = relativeIndexPath.replace(File.separatorChar, '/');
+         }
+         // generate ready-to-use formatter string with address variable 
+         urlFormatString = "rsync://%s:" + rsyncPort + "/" + rsyncEntryName + relativeIndexPath + "/";
+      }
+      else
+      {
+         throw new RepositoryConfigurationException(
+            "Invalid RSync configuration. Index must be placed in folder that is a descendant of RSync Server Entry. Current RSync Server Entry Path is : "
+               + absoluteRsyncEntryPath
+               + " but it doesnt hold Index folder, that is : "
+               + this.indexPath
+               + ". Please fix configuration according to JCR Documentation and restart application.");
+      }
+
+   }
+
+    /**
+    * {@inheritDoc}
+    */
+   @Override
+   protected void refreshIndexes(Set<String> set)
+   {
+      // Call RSync to retrieve actual index from coordinator
+      if (modeHandler.getMode() == IndexerIoMode.READ_ONLY)
+      {
+         if (cache.getLocalAddress() instanceof IpAddress)
+         {
+            // Coordinator's address 
+            String address =
+               ((IpAddress)((CacheSPI)cache).getRPCManager().getCoordinator()).getIpAddress().getHostAddress();
+            RSyncJob rSyncJob = new RSyncJob(String.format(urlFormatString, address), indexPath);
+            try
+            {
+               synchronized (this)
+               {
+                  rSyncJob.execute();
+               }
+            }
+            catch (IOException e)
+            {
+               log.error("Failed to retrieve index using RSYNC", e);
+            }
+         }
+      }
+      // call super, after indexes are synchronized
+      super.refreshIndexes(set);
+   }
+
+   private class RSyncJob
+   {
+      private Process process;
+
+      private String src;
+
+      private String dst;
+
+      public RSyncJob(String src, String dst)
+      {
+         this.src = src.endsWith(File.separator) ? src : src + File.separator;
+         this.dst = dst;
+      }
+
+      public void execute() throws IOException
+      {
+         Runtime run = Runtime.getRuntime();
+         try
+         {
+            String command = "rsync -rv --delete " + src + " " + dst;
+            log.info("Rsync job started: " + command);
+            process = run.exec(command);
+
+            Integer returnCode = null;
+            // wait for thread
+            while (returnCode == null)
+            {
+               try
+               {
+                  returnCode = process.waitFor();
+               }
+               catch (InterruptedException e)
+               {
+                  // oops, this can happen sometimes
+               }
+            }
+            log.info("Rsync job finished: " + returnCode);
+            if (returnCode != 0)
+            {
+               throw new IOException("RSync job finished with exit code is " + returnCode);
+            }
+         }
+         finally
+         {
+            process = null;
+         }
+      }
+
+      public void forceCancel()
+      {
+         if (process != null)
+         {
+            process.destroy();
+         }
+      }
+   }
+
+}


Property changes on: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain



More information about the exo-jcr-commits mailing list