[jboss-svn-commits] JBL Code SVN: r35820 - in labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src: main/java/org/drools/grid/impl and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Nov 1 12:25:30 EDT 2010
Author: salaboy21
Date: 2010-11-01 12:25:28 -0400 (Mon, 01 Nov 2010)
New Revision: 35820
Added:
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/RegisterWhitePagesConfiguration.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesServiceConfiguration.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/RegisterSchedulerConfiguration.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RegisterServicesTest.java
Modified:
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/MultiplexSocketService.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/MultiplexSocketServerImpl.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/io/impl/ConversationImpl.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesSocketConfiguration.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerSocketConfiguration.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/DistributedSchedulerTest.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/MockJob.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RemoteWhitePagesTest.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/WhitePagesTest.java
Log:
JBRULES-2747: Drools Grid Scheduler Impl
- adding SchedulerClient logic
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/MultiplexSocketService.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/MultiplexSocketService.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/MultiplexSocketService.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -1,5 +1,6 @@
package org.drools.grid;
+import java.util.Set;
import org.drools.grid.io.MessageReceiverHandler;
public interface MultiplexSocketService {
@@ -13,5 +14,7 @@
String getIp();
+ Set<Integer> getPorts();
+
void close();
}
\ No newline at end of file
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/MultiplexSocketServerImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/MultiplexSocketServerImpl.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/MultiplexSocketServerImpl.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -3,6 +3,7 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.drools.SystemEventListener;
import org.drools.grid.MultiplexSocketService;
@@ -77,4 +78,8 @@
public String getIp() {
return this.ip;
}
+
+ public Set<Integer> getPorts(){
+ return acceptors.keySet();
+ }
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/io/impl/ConversationImpl.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/io/impl/ConversationImpl.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/io/impl/ConversationImpl.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -70,7 +70,7 @@
}
public void sendMessage(Object body,
- MessageReceiverHandler handler) {
+ MessageReceiverHandler handler) {
int requestId = -1;
if ( handler != null ) {
requestId = this.requestId.getAndIncrement();
Added: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/RegisterWhitePagesConfiguration.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/RegisterWhitePagesConfiguration.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/RegisterWhitePagesConfiguration.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.grid.service.directory.impl;
+
+
+import java.net.InetSocketAddress;
+import org.drools.grid.CoreServicesWhitePages;
+import org.drools.grid.Grid;
+import org.drools.grid.GridPeerServiceConfiguration;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.MultiplexSocketService;
+import org.drools.grid.service.directory.Address;
+import org.drools.grid.service.directory.WhitePages;
+import org.drools.grid.timer.impl.ServiceConfiguration;
+import org.drools.time.SchedulerService;
+
+/**
+ *
+ * @author salaboy
+ */
+public class RegisterWhitePagesConfiguration implements GridPeerServiceConfiguration {
+
+ public RegisterWhitePagesConfiguration() {
+ }
+
+
+ public void configureService(Grid grid) {
+ CoreServicesWhitePagesImpl coreServicesWP = (CoreServicesWhitePagesImpl) grid.get( CoreServicesWhitePages.class );
+
+ GridServiceDescriptionImpl gsd = (GridServiceDescriptionImpl) coreServicesWP.lookup(WhitePages.class);
+ if ( gsd == null ) {
+ gsd = new GridServiceDescriptionImpl( WhitePages.class );
+ }
+
+
+ MultiplexSocketService mss = grid.get( MultiplexSocketService.class );
+ int port = mss.getPorts().iterator().next();
+ GridServiceDescription service = coreServicesWP.getServices().get( WhitePages.class.getName() );
+ if( service == null){
+ coreServicesWP.getServices().put(WhitePages.class.getName(), gsd);
+ service = gsd;
+ }
+ Address address = null;
+ if(service.getAddresses().get("socket") != null){
+ address = service.getAddresses().get("socket");
+ } else{
+ address = service.addAddress( "socket" );
+ }
+
+ InetSocketAddress[] addresses = (InetSocketAddress[])address.getObject();
+ if(addresses != null && addresses.length >= 1){
+ InetSocketAddress[] newAddresses = new InetSocketAddress[addresses.length+1];
+ if(addresses !=null){
+ System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
+ }
+
+ newAddresses[addresses.length]= new InetSocketAddress( mss.getIp(),
+ port);
+ ServiceConfiguration conf = new WhitePagesServiceConfiguration(newAddresses);
+ service.setData(conf);
+ }else{
+ InetSocketAddress[] newAddress = new InetSocketAddress[1];
+ newAddress[0]= new InetSocketAddress( mss.getIp(),
+ port);
+ address.setObject( newAddress );
+ ServiceConfiguration conf = new WhitePagesServiceConfiguration(newAddress);
+ service.setData(conf);
+ }
+ }
+
+
+}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesClient.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesClient.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -1,10 +1,10 @@
package org.drools.grid.service.directory.impl;
+import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.drools.SystemEventListenerFactory;
import org.drools.grid.GridServiceDescription;
import org.drools.grid.MessageReceiverHandlerFactoryService;
@@ -13,8 +13,6 @@
import org.drools.grid.io.ConversationManager;
import org.drools.grid.io.MessageReceiverHandler;
import org.drools.grid.io.impl.CommandImpl;
-import org.drools.grid.io.impl.ConversationManagerImpl;
-import org.drools.grid.remote.mina.MinaConnector;
import org.drools.grid.service.directory.Address;
import org.drools.grid.service.directory.WhitePages;
@@ -32,15 +30,19 @@
this.conversationManager = conversationManager;
}
- public WhitePagesClient(GridServiceDescription gsd) {
- this.whitePagesGsd = gsd;
- this.conversationManager = new ConversationManagerImpl("wpclient", new MinaConnector(), SystemEventListenerFactory.getSystemEventListener());
- }
public static Object sendMessage(ConversationManager conversationManager,
- InetSocketAddress[] sockets,
+ Serializable addr,
String id,
Object body) {
+
+ InetSocketAddress[] sockets = null;
+ if(addr instanceof InetSocketAddress[]){
+ sockets = (InetSocketAddress[])addr;
+ }else if (addr instanceof InetSocketAddress){
+ sockets = new InetSocketAddress[1];
+ sockets[0] = (InetSocketAddress)addr;
+ }
BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
Exception exception = null;
for ( InetSocketAddress socket : sockets ) {
@@ -109,25 +111,7 @@
return new WhitePagesServer( this );
}
- public List<GridServiceDescription> lookupServices(Class clazz) {
- InetSocketAddress[] sockets = (InetSocketAddress[]) ((Address) whitePagesGsd.getAddresses().get( "socket" )).getObject();
- CommandImpl cmd = new CommandImpl( "WhitePages.lookupServices",
- Arrays.asList( new Object[]{ clazz } ) );
- List<GridServiceDescription> gsds = ( List<GridServiceDescription> ) sendMessage( this.conversationManager,
- sockets,
- this.whitePagesGsd.getId(),
- cmd);
- List<GridServiceDescription> result = new ArrayList<GridServiceDescription>();
-
- for(GridServiceDescription gsd : gsds){
- result.add( new GridServiceDescriptionClient(gsd,
- this.whitePagesGsd,
- this.conversationManager ));
- }
-
-
- return result;
- }
+
// public void addAddress(String id,
// Address address) {
Copied: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesServiceConfiguration.java (from rev 35804, labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerServiceConfiguration.java)
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesServiceConfiguration.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesServiceConfiguration.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.grid.service.directory.impl;
+
+import org.drools.grid.timer.impl.*;
+import java.net.InetSocketAddress;
+import org.drools.grid.Grid;
+
+/**
+ *
+ * @author salaboy
+ */
+public class WhitePagesServiceConfiguration implements ServiceConfiguration{
+
+ private InetSocketAddress[] addresses;
+
+ public WhitePagesServiceConfiguration(InetSocketAddress[] addresses) {
+ this.addresses = addresses;
+ }
+
+ public InetSocketAddress[] getServices(Grid grid) {
+ //get addresses from the grid.. or whatever
+ return addresses;
+ }
+
+
+
+
+}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesSocketConfiguration.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesSocketConfiguration.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/service/directory/impl/WhitePagesSocketConfiguration.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -41,30 +41,30 @@
MultiplexSocketService mss = grid.get( MultiplexSocketService.class );
- GridServiceDescription service = coreServicesWP.getServices().get( WhitePages.class.getName() );
- if( service == null){
- coreServicesWP.getServices().put(WhitePages.class.getName(), gsd);
- service = gsd;
- }
-
- Address address = null;
- if(service.getAddresses().get("socket") != null){
- address = service.getAddresses().get("socket");
- } else{
- address = service.addAddress( "socket" );
- }
- InetSocketAddress[] addresses = (InetSocketAddress[])address.getObject();
- int newAddressesLenght = 1;
- if(addresses != null){
- newAddressesLenght = addresses.length + 1;
- }
- InetSocketAddress[] newAddresses = new InetSocketAddress[newAddressesLenght];
- if(addresses !=null){
- System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
- }
- newAddresses[newAddressesLenght-1]= new InetSocketAddress( mss.getIp(),
- this.port);
- address.setObject( newAddresses );
+// GridServiceDescription service = coreServicesWP.getServices().get( WhitePages.class.getName() );
+// if( service == null){
+// coreServicesWP.getServices().put(WhitePages.class.getName(), gsd);
+// service = gsd;
+// }
+//
+// Address address = null;
+// if(service.getAddresses().get("socket") != null){
+// address = service.getAddresses().get("socket");
+// } else{
+// address = service.addAddress( "socket" );
+// }
+// InetSocketAddress[] addresses = (InetSocketAddress[])address.getObject();
+// int newAddressesLenght = 1;
+// if(addresses != null){
+// newAddressesLenght = addresses.length + 1;
+// }
+// InetSocketAddress[] newAddresses = new InetSocketAddress[newAddressesLenght];
+// if(addresses !=null){
+// System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
+// }
+// newAddresses[newAddressesLenght-1]= new InetSocketAddress( mss.getIp(),
+// this.port);
+// address.setObject( newAddresses );
mss.addService( this.port,
WhitePages.class.getName(),
Added: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/RegisterSchedulerConfiguration.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/RegisterSchedulerConfiguration.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/RegisterSchedulerConfiguration.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+package org.drools.grid.timer.impl;
+
+import org.drools.grid.service.directory.impl.*;
+import java.net.InetSocketAddress;
+import org.drools.grid.CoreServicesWhitePages;
+import org.drools.grid.Grid;
+import org.drools.grid.GridPeerServiceConfiguration;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.MultiplexSocketService;
+import org.drools.grid.service.directory.Address;
+import org.drools.time.SchedulerService;
+
+/**
+ *
+ * @author salaboy
+ */
+public class RegisterSchedulerConfiguration implements GridPeerServiceConfiguration {
+
+ public RegisterSchedulerConfiguration() {
+ }
+
+ public void configureService(Grid grid) {
+ CoreServicesWhitePagesImpl coreServicesWP = (CoreServicesWhitePagesImpl) grid.get(CoreServicesWhitePages.class);
+
+ GridServiceDescriptionImpl gsd = (GridServiceDescriptionImpl) coreServicesWP.lookup(SchedulerService.class);
+ if (gsd == null) {
+ gsd = new GridServiceDescriptionImpl(SchedulerService.class);
+ }
+
+
+ MultiplexSocketService mss = grid.get(MultiplexSocketService.class);
+ int port = mss.getPorts().iterator().next();
+ GridServiceDescription service = coreServicesWP.getServices().get(SchedulerService.class.getName());
+ if (service == null) {
+ coreServicesWP.getServices().put(SchedulerService.class.getName(), gsd);
+ service = gsd;
+ }
+ Address address = null;
+ if (service.getAddresses().get("socket") != null) {
+ address = service.getAddresses().get("socket");
+ } else {
+ address = service.addAddress("socket");
+ }
+
+ InetSocketAddress[] addresses = (InetSocketAddress[]) address.getObject();
+ if (addresses != null && addresses.length >= 1) {
+ InetSocketAddress[] newAddresses = new InetSocketAddress[addresses.length + 1];
+ if (addresses != null) {
+ System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
+ }
+
+ newAddresses[addresses.length] = new InetSocketAddress(mss.getIp(),
+ port);
+ ServiceConfiguration conf = new SchedulerServiceConfiguration(newAddresses);
+ service.setData(conf);
+ } else {
+ InetSocketAddress[] newAddress = new InetSocketAddress[1];
+ newAddress[0] = new InetSocketAddress(mss.getIp(),
+ port);
+ address.setObject(newAddress);
+ ServiceConfiguration conf = new SchedulerServiceConfiguration(newAddress);
+ service.setData(conf);
+
+ }
+ }
+}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerClient.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerClient.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -22,6 +22,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.drools.grid.Grid;
import org.drools.grid.GridServiceDescription;
import org.drools.grid.MessageReceiverHandlerFactoryService;
@@ -105,13 +107,39 @@
public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
- SchedulerServiceConfiguration conf = (SchedulerServiceConfiguration) schedulerGsd.getData();
List<JobHandle> jobHandles = new ArrayList<JobHandle>();
UuidJobHandle jobhandle = new UuidJobHandle();
- for( int i = 0; i < conf.getRedundancy(); i ++){
- int bucket = (int)jobhandle.hashCode() % conf.getServices(grid).length;
+ // Get the Service Configuration from the Data field
+ SchedulerServiceConfiguration conf = (SchedulerServiceConfiguration) schedulerGsd.getData();
+ // If the GSD doesn't have conf and it doesn't have addresses, we can use the local SchedulerService
+ if(conf == null && schedulerGsd.getAddresses().get("socket") == null){
+ SchedulerService sched = null;
+ try {
+ // We use the ID that contains the type of the service that we are using -> refactor this and include serviceType in GSD
+ sched = grid.get((Class<SchedulerService>)Class.forName(schedulerGsd.getId()));
+ } catch (ClassNotFoundException ex) {
+ Logger.getLogger(SchedulerClient.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ return sched.scheduleJob(job, ctx, trigger);
+ }
+ // If we have a service configuration
+ int redundancy = 1;
+ InetSocketAddress[] addresses = null;
+ if(conf != null){
+ redundancy = conf.getRedundancy();
+ addresses = conf.getServices(grid);
+ }
+ // If we have an address use that address.
+ if(addresses == null){
+ if(schedulerGsd.getAddresses() != null && schedulerGsd.getAddresses().get("socket") != null){
+ addresses = (InetSocketAddress[])schedulerGsd.getAddresses().get("socket").getObject();
+ }
+ }
+ //If not use the configuration and the bucket systems.
+ for( int i = 0; i < redundancy; i ++){
+ int bucket = (int)jobhandle.hashCode() % addresses.length;
//InetSocketAddress[] sockets = (InetSocketAddress[]) ((Address) schedulerGsd.getAddresses().get( "socket" )).getObject();
- InetSocketAddress socket = conf.getServices(grid)[bucket];
+ InetSocketAddress socket = addresses[bucket];
CommandImpl cmd = new CommandImpl( "Scheduler.scheduleJob",
Arrays.asList( new Object[]{ new ScheduledJob(jobhandle, job, ctx, trigger, null) } ) );
UuidJobHandle handle = (UuidJobHandle) sendMessage( this.conversationManager,
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerSocketConfiguration.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerSocketConfiguration.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/timer/impl/SchedulerSocketConfiguration.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -42,48 +42,37 @@
MultiplexSocketService mss = grid.get( MultiplexSocketService.class );
- GridServiceDescription service = coreServicesWP.getServices().get( SchedulerService.class.getName() );
- if( service == null){
- coreServicesWP.getServices().put(SchedulerService.class.getName(), gsd);
- service = gsd;
- }
- Address address = null;
- if(service.getAddresses().get("socket") != null){
- address = service.getAddresses().get("socket");
- } else{
- address = service.addAddress( "socket" );
- }
- InetSocketAddress[] addresses = (InetSocketAddress[])address.getObject();
- if(addresses != null && addresses.length >= 1){
- InetSocketAddress[] newAddresses = new InetSocketAddress[addresses.length+1];
- if(addresses !=null){
- System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
- }
- newAddresses[addresses.length]= new InetSocketAddress( mss.getIp(),
- this.port);
- ServiceConfiguration conf = new SchedulerServiceConfiguration(newAddresses);
- service.setData(conf);
- }else{
- InetSocketAddress[] newAddress = new InetSocketAddress[1];
- newAddress[0]= new InetSocketAddress( mss.getIp(),
- this.port);
- address.setObject( newAddress );
- ServiceConfiguration conf = new SchedulerServiceConfiguration(newAddress);
- service.setData(conf);
- }
-
-// int newAddressesLenght = 1;
-// if(addresses != null){
-// newAddressesLenght = addresses.length + 1;
+// GridServiceDescription service = coreServicesWP.getServices().get( SchedulerService.class.getName() );
+// if( service == null){
+// coreServicesWP.getServices().put(SchedulerService.class.getName(), gsd);
+// service = gsd;
// }
-// InetSocketAddress[] newAddresses = new InetSocketAddress[newAddressesLenght];
-// if(addresses !=null){
-// System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
+// Address address = null;
+// if(service.getAddresses().get("socket") != null){
+// address = service.getAddresses().get("socket");
+// } else{
+// address = service.addAddress( "socket" );
// }
-// newAddresses[newAddressesLenght-1]= new InetSocketAddress( mss.getIp(),
+// InetSocketAddress[] addresses = (InetSocketAddress[])address.getObject();
+// if(addresses != null && addresses.length >= 1){
+// InetSocketAddress[] newAddresses = new InetSocketAddress[addresses.length+1];
+// if(addresses !=null){
+// System.arraycopy(addresses, 0, newAddresses, 0, addresses.length);
+// }
+// newAddresses[addresses.length]= new InetSocketAddress( mss.getIp(),
+// this.port);
+// ServiceConfiguration conf = new SchedulerServiceConfiguration(newAddresses);
+// service.setData(conf);
+// }else{
+// InetSocketAddress[] newAddress = new InetSocketAddress[1];
+// newAddress[0]= new InetSocketAddress( mss.getIp(),
// this.port);
-// address.setObject( newAddresses );
+// address.setObject( newAddress );
+// ServiceConfiguration conf = new SchedulerServiceConfiguration(newAddress);
+// service.setData(conf);
+// }
+
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/DistributedSchedulerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/DistributedSchedulerTest.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/DistributedSchedulerTest.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -1,6 +1,8 @@
package org.drools.grid.time.impl;
import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.drools.grid.timer.impl.UuidJobHandle;
import org.drools.grid.timer.impl.ScheduledJob;
import java.net.InetSocketAddress;
@@ -41,10 +43,12 @@
import org.drools.grid.io.impl.MultiplexSocketServiceCongifuration;
import org.drools.grid.remote.mina.MinaAcceptorFactoryService;
import org.drools.grid.service.directory.impl.CoreServicesWhitePagesConfiguration;
+import org.drools.grid.service.directory.impl.RegisterWhitePagesConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesLocalConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesRemoteConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesSocketConfiguration;
import org.drools.grid.timer.impl.CoreServicesSchedulerConfiguration;
+import org.drools.grid.timer.impl.RegisterSchedulerConfiguration;
import org.drools.grid.timer.impl.ScheduledJobConfiguration;
import org.drools.grid.timer.impl.SchedulerClient;
import org.drools.grid.timer.impl.SchedulerImpl;
@@ -61,10 +65,12 @@
private Map<String, GridServiceDescription> coreServicesMap;
@Override
public void setUp() {
+
}
@Override
public void tearDown() {
+ MockJob.counter = 0;
}
public void test1() throws Exception {
@@ -110,7 +116,7 @@
}
public void testDistributedJobSchedullingLocal() {
-
+
GridImpl grid = new GridImpl(new ConcurrentHashMap<String, Object>());
grid.addService(SchedulerService.class, new SchedulerImpl("myLocalSched",grid));
@@ -121,9 +127,15 @@
ScheduledJob sj2 = new ScheduledJob(handle, new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)), new ScheduledJobConfiguration(1));
scheduler.scheduleJob(new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)));
+ //The Job Will be executed in 1 second
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Logger.getLogger(DistributedSchedulerTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ assertEquals(1, MockJob.counter);
-
}
/*
@@ -168,17 +180,23 @@
//Schedule remotely the Job
scheduler.scheduleJob(new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)));
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Logger.getLogger(DistributedSchedulerTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ assertEquals(1, MockJob.counter);
//Close the peer connection
conn.close();
//Shutdown the MultiplexSocketService
grid1.get(MultiplexSocketService.class).close();
+
-
}
public void testMultipleSchedulersTest(){
@@ -207,16 +225,19 @@
Assert.assertEquals(2, ((InetSocketAddress[])((SchedulerServiceConfiguration)gsd.getData()).getServices(grid3)).length);
+ Assert.assertEquals(0, MockJob.counter);
conn.close();
grid1.get(MultiplexSocketService.class).close();
grid2.get(MultiplexSocketService.class).close();
+
+
}
public void testGetDataFromCoreServices(){
- coreServicesMap = new HashMap<String, GridServiceDescription>();//Hazelcast.newHazelcastInstance( null ).getMap( CoreServicesWhitePages.class.getName() );
+ coreServicesMap = new HashMap<String, GridServiceDescription>();//Hazelcast.newHazelcastInstance( null ).getMap( CoreServicesWhitePages.class.getName() );
//Grid View
GridImpl grid1 = new GridImpl(new ConcurrentHashMap<String, Object>());
@@ -244,17 +265,25 @@
SchedulerClient schedulerClient = new SchedulerClient(grid1,gsd, cm);
+ ((SchedulerServiceConfiguration)gsd.getData()).setRedundancy(3);
+ JobHandle handle = schedulerClient.scheduleJob(new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)));
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Logger.getLogger(DistributedSchedulerTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
- JobHandle handle = schedulerClient.scheduleJob(new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)));
-
-
+ assertEquals(3, MockJob.counter);
+
conn.close();
grid1.get(MultiplexSocketService.class).close();
grid2.get(MultiplexSocketService.class).close();
}
+
+
public static class MockJobContext implements JobContext, Serializable {
@@ -413,8 +442,9 @@
//Exposing Local WhitePages
GridPeerServiceConfiguration wpsc = new WhitePagesSocketConfiguration(port);
conf.addConfiguration(wpsc);
+ GridPeerServiceConfiguration registerwpincore = new RegisterWhitePagesConfiguration();
+ conf.addConfiguration(registerwpincore);
-
//Create a Local Scheduler
GridPeerServiceConfiguration schlConf = new SchedulerLocalConfiguration("myLocalSched");
conf.addConfiguration(schlConf);
@@ -423,6 +453,9 @@
// I need to use the same port to reuse the service multiplexer
GridPeerServiceConfiguration schlsc = new SchedulerSocketConfiguration(port);
conf.addConfiguration(schlsc);
+
+ GridPeerServiceConfiguration registerschedincore = new RegisterSchedulerConfiguration();
+ conf.addConfiguration(registerschedincore);
conf.configure(grid);
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/MockJob.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/MockJob.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/time/impl/MockJob.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -25,7 +25,9 @@
* @author salaboy
*/
public class MockJob implements Job {
+ public static int counter = 0;
public void execute(JobContext ctx) {
System.out.println("Job Executed!");
+ counter++;
}
}
Added: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RegisterServicesTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RegisterServicesTest.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RegisterServicesTest.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+package org.drools.io.mina;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import org.drools.SystemEventListener;
+import org.drools.SystemEventListenerFactory;
+import org.drools.grid.CoreServicesWhitePages;
+import org.drools.grid.GridPeerConfiguration;
+import org.drools.grid.GridPeerServiceConfiguration;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.MultiplexSocketService;
+import org.drools.grid.impl.GridImpl;
+import org.drools.grid.impl.MultiplexSocketServerImpl;
+import org.drools.grid.io.Connector;
+import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.ConversationManagerImpl;
+import org.drools.grid.io.impl.MultiplexSocketServiceCongifuration;
+import org.drools.grid.remote.mina.MinaAcceptorFactoryService;
+import org.drools.grid.remote.mina.MinaConnector;
+import org.drools.grid.service.directory.WhitePages;
+import org.drools.grid.service.directory.impl.CoreServicesWhitePagesConfiguration;
+import org.drools.grid.service.directory.impl.RegisterWhitePagesConfiguration;
+import org.drools.grid.service.directory.impl.WhitePagesClient;
+import org.drools.grid.service.directory.impl.WhitePagesLocalConfiguration;
+import org.drools.grid.service.directory.impl.WhitePagesSocketConfiguration;
+import org.drools.grid.time.impl.DistributedSchedulerTest.MockJobContext;
+import org.drools.grid.time.impl.DistributedSchedulerTest.MockTrigger;
+import org.drools.grid.time.impl.MockJob;
+import org.drools.grid.timer.impl.CoreServicesSchedulerConfiguration;
+import org.drools.grid.timer.impl.RegisterSchedulerConfiguration;
+import org.drools.grid.timer.impl.SchedulerClient;
+import org.drools.grid.timer.impl.SchedulerLocalConfiguration;
+import org.drools.grid.timer.impl.SchedulerSocketConfiguration;
+import org.drools.time.SchedulerService;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author salaboy
+ */
+public class RegisterServicesTest {
+
+ private SystemEventListener l = SystemEventListenerFactory.getSystemEventListener();
+ private Map<String, GridServiceDescription> coreServicesMap;
+
+ public RegisterServicesTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ @Test
+ public void testRegisterInCoreServices() {
+
+ coreServicesMap = new HashMap<String, GridServiceDescription>();//Hazelcast.newHazelcastInstance( null ).getMap( CoreServicesWhitePages.class.getName() );
+
+ GridImpl grid = new GridImpl(new HashMap<String, Object>());
+
+ //Local Grid Configuration, for our client
+ GridPeerConfiguration conf = new GridPeerConfiguration();
+
+ //Configuring the Core Services White Pages
+ GridPeerServiceConfiguration coreSeviceWPConf = new CoreServicesWhitePagesConfiguration(coreServicesMap);
+ conf.addConfiguration(coreSeviceWPConf);
+
+ //Configuring the Core Services Scheduler
+ GridPeerServiceConfiguration coreSeviceSchedulerConf = new CoreServicesSchedulerConfiguration();
+ conf.addConfiguration(coreSeviceSchedulerConf);
+
+ //Configuring the MultiplexSocketService
+ GridPeerServiceConfiguration socketConf = new MultiplexSocketServiceCongifuration(new MultiplexSocketServerImpl("127.0.0.1",
+ new MinaAcceptorFactoryService(),
+ l));
+ conf.addConfiguration(socketConf);
+
+ //Configuring the WhitePages
+ GridPeerServiceConfiguration wplConf = new WhitePagesLocalConfiguration();
+ conf.addConfiguration(wplConf);
+
+ //Exposing Local WhitePages
+ GridPeerServiceConfiguration wpsc = new WhitePagesSocketConfiguration(5012);
+ conf.addConfiguration(wpsc);
+
+
+ GridPeerServiceConfiguration registerwpincore = new RegisterWhitePagesConfiguration();
+ conf.addConfiguration(registerwpincore);
+
+
+ //Create a Local Scheduler
+ GridPeerServiceConfiguration schlConf = new SchedulerLocalConfiguration("myLocalSched");
+ conf.addConfiguration(schlConf);
+
+ //Expose it to the Grid so it can be accesed by different nodes
+ // I need to use the same port to reuse the service multiplexer
+ GridPeerServiceConfiguration schlsc = new SchedulerSocketConfiguration(5012);
+ conf.addConfiguration(schlsc);
+
+ GridPeerServiceConfiguration registerschedincore = new RegisterSchedulerConfiguration();
+ conf.addConfiguration(registerschedincore);
+
+
+ conf.configure(grid);
+
+
+ //Local White Pages
+ WhitePages wp = grid.get(WhitePages.class);
+
+ //Local sched in Local WP
+ GridServiceDescription gsdLocalSched = wp.lookup(SchedulerService.class.getName());
+
+ //Get the CoreWhitePages
+ CoreServicesWhitePages corewp = grid.get(CoreServicesWhitePages.class);
+ //Get the registered Scheduler
+ GridServiceDescription gsdLocalButExposedSched = corewp.lookup(SchedulerService.class);
+ //Get the registered white pages
+ GridServiceDescription gsdLocalButExposedWp = corewp.lookup(WhitePages.class);
+
+
+ //The Scheduler is local = no addresses and no Data
+ Assert.assertEquals(0, gsdLocalSched.getAddresses().size());
+ Assert.assertNull(gsdLocalSched.getData());
+
+ Assert.assertNotNull(gsdLocalButExposedSched.getData());
+
+ Connector conn = new MinaConnector();
+
+ ConversationManager cm = new ConversationManagerImpl("s1",
+ conn,
+ l);
+
+ SchedulerClient sched = null;
+
+
+ GridServiceDescription clientSched1 = wp.lookup(SchedulerService.class.getName());
+ sched = new SchedulerClient(grid, clientSched1, cm);
+ sched.scheduleJob(new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)));
+
+ sched = new SchedulerClient(grid, gsdLocalButExposedSched, cm);
+ sched.scheduleJob(new MockJob(), new MockJobContext("xxx"), new MockTrigger(new Date(1000)));
+
+ //@TODO: FIX THIS! something weird is happening with the handlers..
+ //GridServiceDescription clientSched2 = new WhitePagesClient( gsdLocalButExposedWp, cm).lookup(SchedulerService.class.getName());
+
+
+ conn.close();
+
+ grid.get(MultiplexSocketService.class).close();
+
+ }
+}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RemoteWhitePagesTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RemoteWhitePagesTest.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/RemoteWhitePagesTest.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -25,6 +25,7 @@
import org.drools.grid.remote.mina.MinaConnector;
import org.drools.grid.service.directory.WhitePages;
import org.drools.grid.service.directory.impl.JpaWhitePages;
+import org.drools.grid.service.directory.impl.RegisterWhitePagesConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesLocalConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesRemoteConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesSocketConfiguration;
@@ -55,6 +56,9 @@
GridPeerServiceConfiguration wpsc = new WhitePagesSocketConfiguration(5012);
conf.addConfiguration( wpsc );
+
+ GridPeerServiceConfiguration registerwpincore = new RegisterWhitePagesConfiguration();
+ conf.addConfiguration(registerwpincore);
conf.configure( grid1 );
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/WhitePagesTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/WhitePagesTest.java 2010-11-01 16:14:05 UTC (rev 35819)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/io/mina/WhitePagesTest.java 2010-11-01 16:25:28 UTC (rev 35820)
@@ -23,6 +23,7 @@
import org.drools.grid.remote.mina.MinaAcceptorFactoryService;
import org.drools.grid.remote.mina.MinaConnector;
import org.drools.grid.service.directory.WhitePages;
+import org.drools.grid.service.directory.impl.RegisterWhitePagesConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesLocalConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesRemoteConfiguration;
import org.drools.grid.service.directory.impl.WhitePagesSocketConfiguration;
@@ -54,6 +55,9 @@
GridPeerServiceConfiguration wpsc = new WhitePagesSocketConfiguration(5012);
conf.addConfiguration( wpsc );
+
+ GridPeerServiceConfiguration registerwpincore = new RegisterWhitePagesConfiguration();
+ conf.addConfiguration(registerwpincore);
conf.configure( grid1 );
More information about the jboss-svn-commits
mailing list