בהינתן מספר 'n' ו-n מספרים מיין את המספרים באמצעות במקביל מיזוג מיון. (רמז: נסה להשתמש בקריאות מערכת shget shmat).
חלק 1: האלגוריתם (איך?)
בצע באופן רקורסיבי שני תהליכי ילד אחד עבור החצי השמאלי אחד של החצי הימני. אם מספר האלמנטים במערך עבור תהליך קטן מ-5 בצעו a מיון הכנסה . לאחר מכן ההורה של שני הילדים ממזג את התוצאה וחוזר בחזרה להורה וכן הלאה. אבל איך עושים את זה במקביל?
חלק 2: ההגיוני (למה?)
החלק החשוב בפתרון לבעיה זו אינו אלגוריתמי אלא להסביר מושגים של מערכת הפעלה וקרנל.
כדי להשיג מיון במקביל אנו זקוקים לדרך לגרום לשני תהליכים לעבוד על אותו מערך בו זמנית. כדי להקל על הדברים לינוקס מספקת הרבה קריאות מערכת דרך נקודות קצה פשוטות של API. Two of them are shmget() (להקצאת זיכרון משותף) ו shmat() (לפעולות זיכרון משותף). אנו יוצרים מרחב זיכרון משותף בין תהליך הילד שאנו מזלגים. כל קטע מחולק לילד שמאל וימין אשר ממוין, החלק המעניין הוא שהם עובדים במקביל! ה-shmget() מבקש מהקרנל להקצות א עמוד משותף לשני התהליכים.
מדוע fork() המסורתי לא עובד?
התשובה טמונה במה fork() עושה בפועל. מהתיעוד 'fork() יוצר תהליך חדש על ידי שכפול תהליך הקריאה'. תהליך הילד ותהליך ההורה פועלים בחללי זיכרון נפרדים. בזמן fork() לשני חללי הזיכרון יש את אותו תוכן. זיכרון כותב קובץ-descriptor(fd) שינויים וכו' המבוצעים על ידי אחד מהתהליכים אינם משפיעים על השני. מכאן שאנו צריכים קטע זיכרון משותף.
#include #include #include #include #include #include #include #include void insertionSort(int arr[] int n); void merge(int a[] int l1 int h1 int h2); void mergeSort(int a[] int l int h) { int i len = (h - l + 1); // Using insertion sort for small sized array if (len <= 5) { insertionSort(a + l len); return; } pid_t lpid rpid; lpid = fork(); if (lpid < 0) { // Lchild proc not created perror('Left Child Proc. not createdn'); _exit(-1); } else if (lpid == 0) { mergeSort(a l l + len / 2 - 1); _exit(0); } else { rpid = fork(); if (rpid < 0) { // Rchild proc not created perror('Right Child Proc. not createdn'); _exit(-1); } else if (rpid == 0) { mergeSort(a l + len / 2 h); _exit(0); } } int status; // Wait for child processes to finish waitpid(lpid &status 0); waitpid(rpid &status 0); // Merge the sorted subarrays merge(a l l + len / 2 - 1 h); } /* Function to sort an array using insertion sort*/ void insertionSort(int arr[] int n) { int i key j; for (i = 1; i < n; i++) { key = arr[i]; j = i - 1; /* Move elements of arr[0..i-1] that are greater than key to one position ahead of their current position */ while (j >= 0 && arr[j] > key) { arr[j + 1] = arr[j]; j = j - 1; } arr[j + 1] = key; } } // Method to merge sorted subarrays void merge(int a[] int l1 int h1 int h2) { // We can directly copy the sorted elements // in the final array no need for a temporary // sorted array. int count = h2 - l1 + 1; int sorted[count]; int i = l1 k = h1 + 1 m = 0; while (i <= h1 && k <= h2) { if (a[i] < a[k]) sorted[m++] = a[i++]; else if (a[k] < a[i]) sorted[m++] = a[k++]; else if (a[i] == a[k]) { sorted[m++] = a[i++]; sorted[m++] = a[k++]; } } while (i <= h1) sorted[m++] = a[i++]; while (k <= h2) sorted[m++] = a[k++]; int arr_count = l1; for (i = 0; i < count; i++ l1++) a[l1] = sorted[i]; } // To check if array is actually sorted or not void isSorted(int arr[] int len) { if (len == 1) { std::cout << 'Sorting Done Successfully' << std::endl; return; } int i; for (i = 1; i < len; i++) { if (arr[i] < arr[i - 1]) { std::cout << 'Sorting Not Done' << std::endl; return; } } std::cout << 'Sorting Done Successfully' << std::endl; return; } // To fill random values in array for testing // purpose void fillData(int a[] int len) { // Create random arrays int i; for (i = 0; i < len; i++) a[i] = rand(); return; } // Driver code int main() { int shmid; key_t key = IPC_PRIVATE; int *shm_array; int length = 128; // Calculate segment length size_t SHM_SIZE = sizeof(int) * length; // Create the segment. if ((shmid = shmget(key SHM_SIZE IPC_CREAT | 0666)) < 0) { perror('shmget'); _exit(1); } // Now we attach the segment to our data space. if ((shm_array = (int *)shmat(shmid NULL 0)) == (int *)-1) { perror('shmat'); _exit(1); } // Create a random array of given length srand(time(NULL)); fillData(shm_array length); // Sort the created array mergeSort(shm_array 0 length - 1); // Check if array is sorted or not isSorted(shm_array length); /* Detach from the shared memory now that we are done using it. */ if (shmdt(shm_array) == -1) { perror('shmdt'); _exit(1); } /* Delete the shared memory segment. */ if (shmctl(shmid IPC_RMID NULL) == -1) { perror('shmctl'); _exit(1); } return 0; }
Java import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class ConcurrentMergeSort { // Method to merge sorted subarrays private static void merge(int[] a int low int mid int high) { int[] temp = new int[high - low + 1]; int i = low j = mid + 1 k = 0; while (i <= mid && j <= high) { if (a[i] <= a[j]) { temp[k++] = a[i++]; } else { temp[k++] = a[j++]; } } while (i <= mid) { temp[k++] = a[i++]; } while (j <= high) { temp[k++] = a[j++]; } System.arraycopy(temp 0 a low temp.length); } // RecursiveAction for fork/join framework static class SortTask extends RecursiveAction { private final int[] a; private final int low high; SortTask(int[] a int low int high) { this.a = a; this.low = low; this.high = high; } @Override protected void compute() { if (high - low <= 5) { Arrays.sort(a low high + 1); } else { int mid = low + (high - low) / 2; invokeAll(new SortTask(a low mid) new SortTask(a mid + 1 high)); merge(a low mid high); } } } // Method to check if array is sorted private static boolean isSorted(int[] a) { for (int i = 0; i < a.length - 1; i++) { if (a[i] > a[i + 1]) { return false; } } return true; } // Method to fill array with random numbers private static void fillData(int[] a) { Random rand = new Random(); for (int i = 0; i < a.length; i++) { a[i] = rand.nextInt(); } } public static void main(String[] args) { int length = 128; int[] a = new int[length]; fillData(a); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(new SortTask(a 0 a.length - 1)); if (isSorted(a)) { System.out.println('Sorting Done Successfully'); } else { System.out.println('Sorting Not Done'); } } }
Python3 import numpy as np import multiprocessing as mp import time def insertion_sort(arr): n = len(arr) for i in range(1 n): key = arr[i] j = i - 1 while j >= 0 and arr[j] > key: arr[j + 1] = arr[j] j -= 1 arr[j + 1] = key def merge(arr l mid r): n1 = mid - l + 1 n2 = r - mid L = arr[l:l + n1].copy() R = arr[mid + 1:mid + 1 + n2].copy() i = j = 0 k = l while i < n1 and j < n2: if L[i] <= R[j]: arr[k] = L[i] i += 1 else: arr[k] = R[j] j += 1 k += 1 while i < n1: arr[k] = L[i] i += 1 k += 1 while j < n2: arr[k] = R[j] j += 1 k += 1 def merge_sort(arr l r): if l < r: if r - l + 1 <= 5: insertion_sort(arr) else: mid = (l + r) // 2 p1 = mp.Process(target=merge_sort args=(arr l mid)) p2 = mp.Process(target=merge_sort args=(arr mid + 1 r)) p1.start() p2.start() p1.join() p2.join() merge(arr l mid r) def is_sorted(arr): for i in range(1 len(arr)): if arr[i] < arr[i - 1]: return False return True def fill_data(arr): np.random.seed(0) arr[:] = np.random.randint(0 1000 size=len(arr)) if __name__ == '__main__': length = 128 shm_array = mp.Array('i' length) fill_data(shm_array) start_time = time.time() merge_sort(shm_array 0 length - 1) end_time = time.time() if is_sorted(shm_array): print('Sorting Done Successfully') else: print('Sorting Not Done') print('Time taken:' end_time - start_time)
JavaScript // Importing required modules const { Worker isMainThread parentPort workerData } = require('worker_threads'); // Function to merge sorted subarrays function merge(a low mid high) { let temp = new Array(high - low + 1); let i = low j = mid + 1 k = 0; while (i <= mid && j <= high) { if (a[i] <= a[j]) { temp[k++] = a[i++]; } else { temp[k++] = a[j++]; } } while (i <= mid) { temp[k++] = a[i++]; } while (j <= high) { temp[k++] = a[j++]; } for (let p = 0; p < temp.length; p++) { a[low + p] = temp[p]; } } // Function to check if array is sorted function isSorted(a) { for (let i = 0; i < a.length - 1; i++) { if (a[i] > a[i + 1]) { return false; } } return true; } // Function to fill array with random numbers function fillData(a) { for (let i = 0; i < a.length; i++) { a[i] = Math.floor(Math.random() * 1000); } } // Function to sort the array using merge sort function sortArray(a low high) { if (high - low <= 5) { a.sort((a b) => a - b); } else { let mid = low + Math.floor((high - low) / 2); sortArray(a low mid); sortArray(a mid + 1 high); merge(a low mid high); } } // Main function function main() { let length = 128; let a = new Array(length); fillData(a); sortArray(a 0 a.length - 1); if (isSorted(a)) { console.log('Sorting Done Successfully'); } else { console.log('Sorting Not Done'); } } main();
תְפוּקָה:
Sorting Done Successfully
מורכבות זמן:O(N log N)
מרחב עזר:O(N)
שיפורי ביצועים?
נסה לתזמן את הקוד ולהשוות את הביצועים שלו לקוד הרציף המסורתי. תופתעו לדעת שביצועי המיון הרציפים טובים יותר!
כשנגיד ילד שמאלי גישה למערך השמאלי המערך נטען למטמון של מעבד. כעת כאשר ניגש למערך הימני (בגלל גישה מקבילה) יש פספוס מטמון מכיוון שהמטמון מלא בקטע שמאלי ואז הקטע הימני מועתק לזיכרון המטמון. תהליך הלוך ושוב זה ממשיך והוא מוריד את הביצועים לרמה כזו שהוא מתפקד גרוע יותר מהקוד הרציף.
ישנן דרכים לצמצם את החמצות המטמון על ידי שליטה בזרימת העבודה של הקוד. אבל אי אפשר להימנע מהם לחלוטין!