Here is an example of how you can perform MergeSort on a distributed grid product like GridGain. This example is somewhat artificial, as you probably would never do the same thing in real life (executing the same code locally is most likely faster), but it does demonstrate some pretty cool features of GridGain, like recursive task execution and continuations.
This is the task class which splits array in two and sends remote jobs to sort the new arrays. Remote jobs in their turn execute the same task over and over again until we get to array size of 1, after which we begin merge process.
This is the task class which splits array in two and sends remote jobs to sort the new arrays. Remote jobs in their turn execute the same task over and over again until we get to array size of 1, after which we begin merge process.
class GridMergeSortTask extends GridTaskSplitAdapter<int[], int[]> { // Injected Grid instance. @GridInstanceResource private Grid grid; @Override protected Collection<GridJob> split(int gridSize, int[] initArr) { Collection<GridJob> jobs = new LinkedList<GridJob>(); for (final int[] arr : splitArray(initArr)) { jobs.add(new GridJobAdapterEx() { // Auto-inject job context. @GridJobContextResource private GridJobContext jobCtx; // Task execution result future. private GridTaskFuture<int[]> fut; @Override public Object execute() throws GridException { if (arr.length == 1) return arr; // Future is null before holdcc() is called and // not null after callcc() is called. if (fut == null) { // Launch the recursive child task asynchronously. fut = grid.execute(new GridMergeSortTask(), arr); // Add a listener to the future, that will resume the // parent task once the child one is completed. fut.listenAsync(new GridInClosure<GridFuture<int[]>>() { @Override public void apply(GridFuture<int[]> fut) { // CONTINUATION: // ============= // Resume suspended job execution. jobCtx.callcc(); } }); // CONTINUATION: // ============= // Suspend job execution to be continued later and // release the executing thread. return jobCtx.holdcc(); } else { assert fut.isDone(); // Return the result of a completed child task. return fut.get(); } } }); } return jobs; } /** * GridTask reduce logic. This method is called when both child jobs * are completed, and is a Reduce step of Merge Sort algorithm. */ @Override public int[] reduce(List<GridJobResult> results) { // This is in case we have a single-element array. if (results.size() == 1) return results.get(0).getData(); assert results.size() == 2; int[] arr1 = results.get(0).getData(); int[] arr2 = results.get(1).getData(); return mergeArrays(arr1, arr2); } private static Iterable<int[]> splitArray(int[] arr) { int len1 = arr.length / 2; int len2 = len1 + arr.length % 2; int[] a1 = new int[len1]; int[] a2 = new int[len2]; System.arraycopy(arr, 0, a1, 0, len1); System.arraycopy(arr, len1, a2, 0, len2); System.out.println("Split array [arr1Len=" + a1.length + ", arr2Len=" + a2.length + ']'); return Arrays.asList(a1, a2); } private static int[] mergeArrays(int[] arr1, int[] arr2) { int[] ret = new int[arr1.length + arr2.length]; int i1 = 0; int i2 = 0; // Merge 2 arrays into a resulting array for (int i = 0; i < ret.length; i++) { if (i1 >= arr1.length) { System.arraycopy(arr2, i2, ret, i, arr2.length - i2); break; } else if (i2 >= arr2.length) { System.arraycopy(arr1, i1, ret, i, arr1.length - i1); break; } else ret[i] = arr1[i1] <= arr2[i2] ? arr1[i1++] : arr2[i2++]; } System.out.println("Merged arrays [resLen=" + ret.length + ", arr1Len=" + arr1.length + ", arr2Len=" + arr2.length + ']'); return ret; } }And here is how you would call this task:
public static void main(String[] args) throws GridException { Grid grid = G.start(); try { int[] inArr = generateRandomArray(30); System.out.println("Unsorted array: " + Arrays.toString(inArr)); int[] outArr = grid.execute(new GridMergeSortTask(), inArr).get(); System.out.println("Sorted array: " + Arrays.toString(outArr)); } finally { G.stop(true); } } private static int[] generateRandomArray(int size) { int[] ret = new int[size]; Random rnd = new Random(); for (int i = 0; i < ret.length; i++) ret[i] = rnd.nextInt(100); return ret; }Enjoy!
Add a comment