2.1. Sort by Key

Algorithm that sorts the elements of a set of files and merges the partial results respecting the order.

First of all - Create a dataset

This step can be avoided if the dataset already exists.

If not, this code snipped creates a set of files with dictionary on each one generated randomly. Uses pickle.

[1]:
def datasetGenerator(directory, numFiles, numPairs):
    import random
    import pickle
    import os
    if os.path.exists(directory):
        print("Dataset directory already exists... Removing")
        import shutil
        shutil.rmtree(directory)
    os.makedirs(directory)
    for f in range(numFiles):
        fragment = {}
        while len(fragment) < numPairs:
            fragment[random.random()] = random.randint(0, 1000)
        filename = 'file_' + str(f) + '.data'
        with open(directory + '/' + filename, 'wb') as fd:
            pickle.dump(fragment, fd)
        print('File ' + filename + ' has been created.')
[2]:
numFiles = 2
numPairs = 10
directoryName = 'mydataset'
datasetGenerator(directoryName, numFiles, numPairs)
Dataset directory already exists... Removing
File file_0.data has been created.
File file_1.data has been created.
[3]:
# Show the files that have been created
%ls -l $directoryName
total 8
-rw-r--r-- 1 javier users 134 nov  8 16:18 file_0.data
-rw-r--r-- 1 javier users 135 nov  8 16:18 file_1.data

Algorithm definition

[4]:
import pycompss.interactive as ipycompss
[5]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
    ipycompss.start(graph=True,
                    project_xml='../xml/project.xml',
                    resources_xml='../xml/resources.xml')
else:
    ipycompss.start(graph=True, monitor=1000)
********************************************************
**************** PyCOMPSs Interactive ******************
********************************************************
*          .-~~-.--.           _____      __   ______  *
*         :         )         |____ \    /  | /  __  \ *
*   .~ ~ -.\       /.- ~~ .     ___) |  /_  | | |  | | *
*   >       `.   .'       <    / ___/     | | | |  | | *
*  (         .- -.         )  | |___   _  | | | |__| | *
*   `- -.-~  `- -'  ~-.- -'   |_____| |_| |_| \______/ *
*     (        :        )           _ _ .-:            *
*      ~--.    :    .--~        .-~  .-~  }            *
*          ~-.-^-.-~ \_      .~  .-~   .~              *
*                   \ \ '     \ '_ _ -~                *
*                    \`.\`.    //                      *
*           . - ~ ~-.__\`.\`-.//                       *
*       .-~   . - ~  }~ ~ ~-.~-.                       *
*     .' .-~      .-~       :/~-.~-./:                 *
*    /_~_ _ . - ~                 ~-.~-._              *
*                                     ~-.<             *
********************************************************
* - Starting COMPSs runtime...                         *
* - Log path : /home/javier/.COMPSs/InteractiveMode_18/
* - PyCOMPSs Runtime started... Have fun!              *
********************************************************
[6]:
from pycompss.api.task import task
from pycompss.api.parameter import FILE_IN
[7]:
@task(returns=list, dataFile=FILE_IN)
def sortPartition(dataFile):
    '''
    Reads the dataFile and sorts its content which is assumed to be a dictionary {K: V}
    :param path: file that contains the data
    :return: a list of (K, V) pairs sorted.
    '''
    import pickle
    import operator
    with open(dataFile, 'rb') as f:
        data = pickle.load(f)
    # res = sorted(data, key=lambda (k, v): k, reverse=not ascending)
    partition_result = sorted(data.items(), key=operator.itemgetter(0), reverse=False)
    return partition_result
[8]:
@task(returns=list, priority=True)
def reducetask(a, b):
    '''
    Merges two partial results (lists of (K, V) pairs) respecting the order
    :param a: Partial result a
    :param b: Partial result b
    :return: The merging result sorted
    '''
    partial_result = []
    i = 0
    j = 0
    while i < len(a) and j < len(b):
        if a[i] < b[j]:
            partial_result.append(a[i])
            i += 1
        else:
            partial_result.append(b[j])
            j += 1
    if i < len(a):
        partial_result + a[i:]
    elif j < len(b):
        partial_result + b[j:]
    return partial_result
[9]:
def merge_reduce(function, data):
    import sys
    if sys.version_info[0] >= 3:
        import queue as Queue
    else:
        import Queue
    q = Queue.Queue()
    for i in data:
        q.put(i)
    while not q.empty():
        x = q.get()
        if not q.empty():
            y = q.get()
            q.put(function(x, y))
        else:
            return x

MAIN

Parameters (that can be configured in the following cell): * datasetPath: The path where the dataset is (default: the same as created previously).

[10]:
import os
import time
from pycompss.api.api import compss_wait_on

datasetPath = directoryName  # Where the dataset is
files = []
for f in os.listdir(datasetPath):
    files.append(datasetPath + '/' + f)

startTime = time.time()

partialSorted = []
for f in files:
    partialSorted.append(sortPartition(f))
result = merge_reduce(reducetask, partialSorted)

result = compss_wait_on(result)

print("Elapsed Time(s)")
print(time.time() - startTime)
import pprint
pprint.pprint(result)
Found task: sortPartition
Found task: reducetask
Elapsed Time(s)
4.178942918777466
[(0.03924729695563789, 994),
 (0.054915200834496414, 446),
 (0.11869528840051136, 427),
 (0.16001242154838613, 199),
 (0.17092663780953166, 916),
 (0.171067840220539, 582),
 (0.17223874211700552, 774),
 (0.26587509273352294, 907),
 (0.4008727823694045, 490),
 (0.4923758729896406, 929),
 (0.5731241794090755, 636),
 (0.5775918513667089, 717),
 (0.6251251387143741, 755),
 (0.7286082191252214, 488),
 (0.7401772833272248, 489),
 (0.7958672050130505, 783),
 (0.8425515282625513, 195),
 (0.898361047779478, 198),
 (0.9138404527082103, 951)]
[11]:
ipycompss.stop()
********************************************************
*************** STOPPING PyCOMPSs ******************
********************************************************
Checking if any issue happened.
Warning: some of the variables used with PyCOMPSs may
         have not been brought to the master.
********************************************************