[jbosscache-commits] JBoss Cache SVN: r6938 - in core/branches/flat/src/main/java/org/jboss/starobrno: cluster and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Oct 14 13:42:47 EDT 2008


Author: mircea.markus
Date: 2008-10-14 13:42:46 -0400 (Tue, 14 Oct 2008)
New Revision: 6938

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/cluster/
   core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
Removed:
   core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
Log:


Copied: core/branches/flat/src/main/java/org/jboss/starobrno/cluster (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/cluster)

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java	2008-10-09 09:55:17 UTC (rev 6895)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java	2008-10-14 17:42:46 UTC (rev 6938)
@@ -1,184 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.RPCManager;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.remote.ReplicateCommand;
-import org.jboss.starobrno.config.Configuration;
-import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.factories.annotations.Start;
-import org.jboss.starobrno.factories.annotations.Stop;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Periodically (or when certain size is exceeded) takes elements and replicates them.
- *
- * @author <a href="mailto:bela at jboss.org">Bela Ban</a> May 24, 2003
- * @version $Revision: 5197 $
- */
-public class ReplicationQueue
-{
-
-   private static final Log log = LogFactory.getLog(ReplicationQueue.class);
-
-   /**
-    * Max elements before we flush
-    */
-   private long max_elements = 500;
-
-   /**
-    * Holds the replication jobs: LinkedList<MethodCall>
-    */
-   final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
-
-   /**
-    * For periodical replication
-    */
-   private ScheduledExecutorService scheduledExecutor = null;
-   private RPCManager rpcManager;
-   private Configuration configuration;
-   private boolean enabled;
-   private CommandsFactory commandsFactory;
-   private static final AtomicInteger counter = new AtomicInteger(0);
-
-   public boolean isEnabled()
-   {
-      return enabled;
-   }
-
-   @Inject
-   private void injectDependencies(RPCManager rpcManager, Configuration configuration, CommandsFactory commandsFactory)
-   {
-      this.rpcManager = rpcManager;
-      this.configuration = configuration;
-      this.commandsFactory = commandsFactory;
-
-      // this is checked again in Start
-      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
-   }
-
-   /**
-    * Starts the asynchronous flush queue.
-    */
-   @Start
-   public synchronized void start()
-   {
-      long interval = configuration.getReplQueueInterval();
-      this.max_elements = configuration.getReplQueueMaxElements();
-      // check again
-      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
-      if (enabled)
-      {
-         if (interval > 0)
-         {
-            if (scheduledExecutor == null)
-            {
-               scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
-               {
-                  public Thread newThread(Runnable r)
-                  {
-                     return new Thread(r, "ReplicationQueue-periodicProcessor-" + counter.getAndIncrement());
-                  }
-               });
-               scheduledExecutor.scheduleWithFixedDelay(new Runnable()
-               {
-                  public void run()
-                  {
-                     flush();
-                  }
-               }, 500l, interval, TimeUnit.MILLISECONDS);
-            }
-         }
-      }
-   }
-
-   /**
-    * Stops the asynchronous flush queue.
-    */
-   @Stop
-   public synchronized void stop()
-   {
-      if (scheduledExecutor != null)
-      {
-         scheduledExecutor.shutdownNow();
-      }
-      scheduledExecutor = null;
-   }
-
-
-   /**
-    * Adds a new method call.
-    */
-   public void add(ReplicateCommand job)
-   {
-      if (job == null)
-         throw new NullPointerException("job is null");
-      synchronized (elements)
-      {
-         elements.add(job);
-         if (elements.size() >= max_elements)
-            flush();
-      }
-   }
-
-   /**
-    * Flushes existing method calls.
-    */
-   public void flush()
-   {
-      List<ReplicableCommand> toReplicate;
-      synchronized (elements)
-      {
-         if (log.isTraceEnabled())
-            log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
-         toReplicate = new ArrayList<ReplicableCommand>(elements);
-         elements.clear();
-      }
-
-      if (toReplicate.size() > 0)
-      {
-         try
-         {
-
-            ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
-            // send to all live nodes in the cluster
-            rpcManager.callRemoteMethods(null, replicateCommand, false, configuration.getSyncReplTimeout(), false);
-         }
-         catch (Throwable t)
-         {
-            log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
-         }
-      }
-   }
-}
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java	2008-10-14 17:42:46 UTC (rev 6938)
@@ -0,0 +1,184 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.factories.annotations.Stop;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.remote.ReplicateCommand;
+import org.jboss.starobrno.RPCManager;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Periodically (or when certain size is exceeded) takes elements and replicates them.
+ *
+ * @author <a href="mailto:bela at jboss.org">Bela Ban</a> May 24, 2003
+ * @version $Revision: 5197 $
+ */
+public class ReplicationQueue
+{
+
+   private static final Log log = LogFactory.getLog(ReplicationQueue.class);
+
+   /**
+    * Max elements before we flush
+    */
+   private long max_elements = 500;
+
+   /**
+    * Holds the replication jobs: LinkedList<MethodCall>
+    */
+   final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
+
+   /**
+    * For periodical replication
+    */
+   private ScheduledExecutorService scheduledExecutor = null;
+   private RPCManager rpcManager;
+   private Configuration configuration;
+   private boolean enabled;
+   private CommandsFactory commandsFactory;
+   private static final AtomicInteger counter = new AtomicInteger(0);
+
+   public boolean isEnabled()
+   {
+      return enabled;
+   }
+
+   @Inject
+   private void injectDependencies(RPCManager rpcManager, Configuration configuration, CommandsFactory commandsFactory)
+   {
+      this.rpcManager = rpcManager;
+      this.configuration = configuration;
+      this.commandsFactory = commandsFactory;
+
+      // this is checked again in Start
+      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
+   }
+
+   /**
+    * Starts the asynchronous flush queue.
+    */
+   @Start
+   public synchronized void start()
+   {
+      long interval = configuration.getReplQueueInterval();
+      this.max_elements = configuration.getReplQueueMaxElements();
+      // check again
+      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
+      if (enabled)
+      {
+         if (interval > 0)
+         {
+            if (scheduledExecutor == null)
+            {
+               scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
+               {
+                  public Thread newThread(Runnable r)
+                  {
+                     return new Thread(r, "ReplicationQueue-periodicProcessor-" + counter.getAndIncrement());
+                  }
+               });
+               scheduledExecutor.scheduleWithFixedDelay(new Runnable()
+               {
+                  public void run()
+                  {
+                     flush();
+                  }
+               }, 500l, interval, TimeUnit.MILLISECONDS);
+            }
+         }
+      }
+   }
+
+   /**
+    * Stops the asynchronous flush queue.
+    */
+   @Stop
+   public synchronized void stop()
+   {
+      if (scheduledExecutor != null)
+      {
+         scheduledExecutor.shutdownNow();
+      }
+      scheduledExecutor = null;
+   }
+
+
+   /**
+    * Adds a new method call.
+    */
+   public void add(ReplicateCommand job)
+   {
+      if (job == null)
+         throw new NullPointerException("job is null");
+      synchronized (elements)
+      {
+         elements.add(job);
+         if (elements.size() >= max_elements)
+            flush();
+      }
+   }
+
+   /**
+    * Flushes existing method calls.
+    */
+   public void flush()
+   {
+      List<ReplicableCommand> toReplicate;
+      synchronized (elements)
+      {
+         if (log.isTraceEnabled())
+            log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
+         toReplicate = new ArrayList<ReplicableCommand>(elements);
+         elements.clear();
+      }
+
+      if (toReplicate.size() > 0)
+      {
+         try
+         {
+
+            ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
+            // send to all live nodes in the cluster
+            rpcManager.callRemoteMethods(null, replicateCommand, false, configuration.getSyncReplTimeout(), false);
+         }
+         catch (Throwable t)
+         {
+            log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
+         }
+      }
+   }
+}
\ No newline at end of file




More information about the jbosscache-commits mailing list