Introduction to Spark

Monte Lunacek

Landscape of Distributed Computing

How do you process 100's of GB of data?

  • Filtering unstructured data
  • Aggregation
  • Large-scale machine learning
  • Graph analysis

Outline

  • Functional programming in Python
  • Hadoop's MapReduce
  • Spark's programming model
  • As many examples as we can get through!

Functional Python

Python acquired lambda, reduce, filter and map, courtesy of a Lisp hacker who missed them and submitted working patches. -Guido van Rossum

We will use these concepts (and more) in Spark

The map abstraction

In [1]:
def square(x):
    return x*x

numbers = [1,2,3]

def map_squares(nums):
    res = []
    for x in nums:
        res.append( square(x) )
    return res

or...

In [2]:
results = map(square, numbers)

For parallel computing in python, map is a key abstraction.

In [3]:
from multiprocessing import Pool
pool = Pool(5)
results = pool.map(square, numbers)

lambda

Anonymous function: a function without a name

In [4]:
lambda_square = lambda x: x*x
map(lambda_square, range(10))
Out[4]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
In [5]:
map(lambda x: x*x, range(10))
Out[5]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
In [6]:
res = map(lambda x: x*x, range(10))
print res
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

reduce

Apply a function with two arguments cumulatively to the container.

In [7]:
def add_num(x1, x2):
    return x1+x2

print reduce(add_num, res)
285

In [8]:
print reduce(lambda x,y: x+y, res)
285

filter

Constructs a new list for items where the applied function is True.

In [9]:
def less_than(x):
    return x>10

filter(less_than, res)
Out[9]:
[16, 25, 36, 49, 64, 81]
In [10]:
filter(lambda x: x>10, res)
Out[10]:
[16, 25, 36, 49, 64, 81]

Data Storage

Hadoop Definitive Guide by Thom White

  • Access speeds are not keeping up with capacity
  • HDFS motivation: e.g 100 MB/s

MapReduce

MapReduce: Simplified Data Processing on Large Clusters

Kernel programming model for working with distributed data.

  • Inefficiencies with MapReduce

Spark

Overview

  • Spark: same code runs locally or on a cluster.

Spark Programming Model

Everything starts with a SparkContext

In [11]:
from pyspark import  SparkContext

if 'sc' not in globals():
    sc = SparkContext(CLUSTER_URL,'example')

This gist by Fernando Perez explains how to initialize the CLUSTER_URL during the startup of IPython.

  • local
  • URL for a distributed cluster
In [12]:
print CLUSTER_URL # could be 'local'
spark://node1239:7077

Create RDDs

RDD Documentation

The parallelize method is a utility for initializing RDDs.

  • Not efficient (it writes a file and reads back in).
In [13]:
import numpy as np

rdd = sc.parallelize(np.arange(20), numSlices=5)

Transformations and Actions

Actions return values

  • collect
  • reduce
  • take
  • count

Transformations return pointers to new RDDs

  • map, flatmap
  • reduceByKey
  • filter
  • glom

What does this look like?

  • glom: Returns an RDD list from each partition of an RDD.
  • collect: Returns a list from all elements of an RDD.
In [14]:
for x in rdd.glom().collect():
    print x
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14, 15]
[16, 17, 18, 19]

In [15]:
rdd = sc.parallelize(np.arange(20), numSlices=10)
for x in rdd.glom().collect():
    print x
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
[16, 17]
[18, 19]

map and Flatmap

Return a new RDD by first applying a function and then flattening the results.

In [16]:
rdd = sc.parallelize([ [2, 3, 4],[0, 1],[5, 6, 7, 8] ])
rdd.collect()
Out[16]:
[[2, 3, 4], [0, 1], [5, 6, 7, 8]]
In [17]:
rdd.map(lambda x: range(len(x))).collect()
Out[17]:
[[0, 1, 2], [0, 1], [0, 1, 2, 3]]

Or I can flatten the results...

In [18]:
rdd.flatMap(lambda x: range(len(x))).collect()
Out[18]:
[0, 1, 2, 0, 1, 0, 1, 2, 3]

Or flatten the original results

In [19]:
rdd.flatMap(lambda x: x).collect()
Out[19]:
[2, 3, 4, 0, 1, 5, 6, 7, 8]

Reduction

In [20]:
rdd.flatMap(lambda x: x).reduce(lambda x,y: x+y)
Out[20]:
36
In [21]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.collect()
Out[21]:
[('a', 1), ('b', 1), ('a', 2)]
In [22]:
rdd.reduceByKey(lambda x,y: x+y).collect()
Out[22]:
[('b', 1), ('a', 3)]
In [23]:
rdd = sc.parallelize([("hamlet", 1), ("claudius", 1), ("hamlet", 1)])
In [24]:
rdd.countByKey()
Out[24]:
defaultdict(<type 'int'>, {'claudius': 1, 'hamlet': 2})

Send the Computation to the Data

Create some work to do.

In [1]:
import time
import os

def work(x):
    start_time = time.time()
    time.sleep(x)
    end_time =  time.time()
    return {'id': os.getpid(), 'start': start_time, 'end_time': end_time}   

Create a a list of work times

In [2]:
import numpy as np

np.random.seed(1045)
job_times = np.random.uniform(0.4, 0.6, 240)

How long should it take?

In [3]:
print 'Estimated serial time = {0:0.2f}'.format(job_times.sum())
print 'Amdahls parallel time = {0:0.2f}'.format(job_times.sum()/12.)
Estimated serial time = 119.80
Amdahls parallel time = 9.98

Helper plot function.

In [4]:
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd

def plot_workflow(results):
    res = pd.DataFrame(results)
    ids = list(set(res['id']))
    id_dic = dict( [k,v+0.65] for k,v in zip(ids, range(len(ids))))
    fig, ax = plt.subplots(figsize=(8, 6))

    tmin = res['start'].min()    
    for i in res.index:
        x_start = res.ix[i]['start'] - tmin
        x_end = res.ix[i]['end_time'] - tmin - x_start
        x_id = id_dic[res.ix[i]['id']]
        ax.add_patch(plt.Rectangle((x_start, x_id), 
                                   x_end, 0.8, 
                                   alpha=0.5, 
                                   color='grey'))
    
    ax.set_ylim(0.5, len(ids)+0.5)
    ax.set_xlim(0, res['end_time'].max() - tmin)
    ax.set_ylabel("Worker")
    ax.set_xlabel("seconds")

Run in parallel with spark

In [5]:
from pyspark import  SparkContext
sc = SparkContext( CLUSTER_URL, 'pyspark')
In [6]:
jobs = sc.parallelize(job_times)
print jobs.count()
results = jobs.map(work)
%time res = results.collect()
240
CPU times: user 4 ms, sys: 3 ms, total: 7 ms
Wall time: 10.6 s

In [7]:
plot_workflow(res)

Simple Aggregation

In [1]:
import numpy as np

data = np.arange(1000).reshape(100,10)
print data.shape
(100, 10)

Pandas

In [2]:
import pandas as pd

tmp = pd.DataFrame(data, 
             columns=['x{0}'.format(i) for i in range(data.shape[1])])
tmp.head()
Out[2]:
x0 x1 x2 x3 x4 x5 x6 x7 x8 x9
0 0 1 2 3 4 5 6 7 8 9
1 10 11 12 13 14 15 16 17 18 19
2 20 21 22 23 24 25 26 27 28 29
3 30 31 32 33 34 35 36 37 38 39
4 40 41 42 43 44 45 46 47 48 49

5 rows × 10 columns

What is the row sum?

In [3]:
tmp.sum(axis=1)
Out[3]:
0       45
1      145
2      245
3      345
4      445
5      545
6      645
7      745
8      845
9      945
10    1045
11    1145
12    1245
13    1345
14    1445
...
85    8545
86    8645
87    8745
88    8845
89    8945
90    9045
91    9145
92    9245
93    9345
94    9445
95    9545
96    9645
97    9745
98    9845
99    9945
Length: 100, dtype: int64

Row sum?

In [4]:
tmp.sum(axis=0)
Out[4]:
x0    49500
x1    49600
x2    49700
x3    49800
x4    49900
x5    50000
x6    50100
x7    50200
x8    50300
x9    50400
dtype: int64
In [5]:
tmp.to_csv('numbers.csv', index=False)

Spark

In [6]:
from pyspark import  SparkContext

if 'sc' not in globals():
    sc = SparkContext( CLUSTER_URL, 'pyspark')
In [7]:
lines = sc.textFile('numbers.csv')
for l in lines.take(2):
    print l
x0,x1,x2,x3,x4,x5,x6,x7,x8,x9
0,1,2,3,4,5,6,7,8,9

How do we skip the header?

In [8]:
lines = lines.filter(lambda x: x.find('x') != 0)
for l in lines.take(2):
    print l
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19

In [9]:
data = lines.map(lambda x: x.split(','))
data.take(3)
Out[9]:
[[u'0', u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'8', u'9'],
 [u'10', u'11', u'12', u'13', u'14', u'15', u'16', u'17', u'18', u'19'],
 [u'20', u'21', u'22', u'23', u'24', u'25', u'26', u'27', u'28', u'29']]

Row Sum

In [14]:
def row_sum(x):
    int_x = map(lambda x: int(x), x)
    return sum(int_x)

data_row_sum = data.map(row_sum)

print data_row_sum.collect()
print data_row_sum.count()
[45, 145, 245, 345, 445, 545, 645, 745, 845, 945, 1045, 1145, 1245, 1345, 1445, 1545, 1645, 1745, 1845, 1945, 2045, 2145, 2245, 2345, 2445, 2545, 2645, 2745, 2845, 2945, 3045, 3145, 3245, 3345, 3445, 3545, 3645, 3745, 3845, 3945, 4045, 4145, 4245, 4345, 4445, 4545, 4645, 4745, 4845, 4945, 5045, 5145, 5245, 5345, 5445, 5545, 5645, 5745, 5845, 5945, 6045, 6145, 6245, 6345, 6445, 6545, 6645, 6745, 6845, 6945, 7045, 7145, 7245, 7345, 7445, 7545, 7645, 7745, 7845, 7945, 8045, 8145, 8245, 8345, 8445, 8545, 8645, 8745, 8845, 8945, 9045, 9145, 9245, 9345, 9445, 9545, 9645, 9745, 9845, 9945]
100

Col Sum

In [19]:
def col_key(x):
    for i, value in enumerate(x):
       yield (i, int(value))

tmp = data.flatMap(col_key)
tmp.take(3)
Out[19]:
[(0, 0), (1, 1), (2, 2)]
In [20]:
tmp = tmp.groupByKey()
for i in tmp.take(2):
    print i
(0, [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 250, 260, 270, 280, 290, 300, 310, 320, 330, 340, 350, 360, 370, 380, 390, 400, 410, 420, 430, 440, 450, 460, 470, 480, 490, 500, 510, 520, 530, 540, 550, 560, 570, 580, 590, 600, 610, 620, 630, 640, 650, 660, 670, 680, 690, 700, 710, 720, 730, 740, 750, 760, 770, 780, 790, 800, 810, 820, 830, 840, 850, 860, 870, 880, 890, 900, 910, 920, 930, 940, 950, 960, 970, 980, 990])
(1, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 111, 121, 131, 141, 151, 161, 171, 181, 191, 201, 211, 221, 231, 241, 251, 261, 271, 281, 291, 301, 311, 321, 331, 341, 351, 361, 371, 381, 391, 401, 411, 421, 431, 441, 451, 461, 471, 481, 491, 501, 511, 521, 531, 541, 551, 561, 571, 581, 591, 601, 611, 621, 631, 641, 651, 661, 671, 681, 691, 701, 711, 721, 731, 741, 751, 761, 771, 781, 791, 801, 811, 821, 831, 841, 851, 861, 871, 881, 891, 901, 911, 921, 931, 941, 951, 961, 971, 981, 991])

In [21]:
data_col_sum = tmp.map(lambda x: sum(x[1]))
for i in data_col_sum.take(2):
    print i
49500
49600

In [22]:
print data_col_sum.collect()
print data_col_sum.count()
[49500, 49600, 49700, 49800, 49900, 50000, 50100, 50200, 50300, 50400]
10

Word Count

In [5]:
%%writefile awsdata.py
import os
import urllib

def get(f):
    filename = f.split('/')[-1]
    #print filename
    if not os.path.exists(filename):
        print 'fetching from aws'
        urllib.urlretrieve(f, filename=filename)
Overwriting awsdata.py

In [6]:
import awsdata
hamlet = awsdata.get('https://s3.amazonaws.com/research_computing_tutorials/hamlet.txt')
hamlet.txt

Standard Python

In [17]:
import re

words = re.split('\W+', hamlet.lower().strip())
print words[:10]
['hamlet', 'dramatis', 'personae', 'claudius', 'king', 'of', 'denmark', 'king', 'claudius', 'hamlet']

In [18]:
words = filter(lambda x: len(x)>2, words)
print words[:10]
['hamlet', 'dramatis', 'personae', 'claudius', 'king', 'denmark', 'king', 'claudius', 'hamlet', 'son']

In [35]:
wc = dict()

def add_to_dic(word):
    wc[word] = wc.get(word,0) + 1

# map the words
map(add_to_dic, words)

#sort by frequency
top_words = sorted(wc.items(), key=lambda x: x[1], reverse=True)[:15]
In [36]:
top_words
Out[36]:
[('the', 930),
 ('and', 843),
 ('you', 496),
 ('hamlet', 368),
 ('that', 346),
 ('lord', 278),
 ('not', 262),
 ('his', 238),
 ('this', 238),
 ('with', 232),
 ('but', 232),
 ('for', 215),
 ('your', 211),
 ('what', 186),
 ('king', 171)]
In [45]:
%matplotlib inline
import matplotlib.pyplot as plt

def plot(words):
    values = map(lambda x: x[1], words)
    labels = map(lambda x: x[0], words)
    plt.barh(range(len(values)), values, color='grey')
    plt.yticks(range(len(values)), labels)
    plt.show()
In [46]:
plot(top_words)

Spark

In [60]:
from pyspark import  SparkContext

if 'sc' not in globals():
    # CLUSTER_URL set during startup
    sc = SparkContext( CLUSTER_URL, 'pyspark')
In [73]:
words = sc.textFile('hamlet.txt')
words.take(5)
Out[73]:
[u'\tHAMLET', u'', u'', u'\tDRAMATIS PERSONAE', u'']
In [74]:
hamlet = words.flatMap(lambda line: re.split('\W+', line.lower().strip()))
hamlet.take(5)
Out[74]:
[u'hamlet', u'', u'', u'dramatis', u'personae']
In [75]:
tmp = hamlet.filter(lambda x: len(x) > 2 )
print tmp.take(5)
[u'hamlet', u'dramatis', u'personae', u'claudius', u'king']

In [76]:
tmp = tmp.map(lambda word: (word, 1))
tmp.take(5)
Out[76]:
[(u'hamlet', 1),
 (u'dramatis', 1),
 (u'personae', 1),
 (u'claudius', 1),
 (u'king', 1)]
In [77]:
tmp = tmp.reduceByKey(lambda a, b: a + b)
tmp.take(5)
Out[77]:
[(u'partial', 1), (u'lean', 1), (u'peruse', 1), (u'leads', 1), (u'sleep', 11)]
In [78]:
tmp = tmp.map(lambda x: (x[1], x[0])).sortByKey(False)
tmp.take(20)
Out[78]:
[(930, u'the'),
 (843, u'and'),
 (496, u'you'),
 (368, u'hamlet'),
 (346, u'that'),
 (278, u'lord'),
 (262, u'not'),
 (238, u'his'),
 (238, u'this'),
 (232, u'with'),
 (232, u'but'),
 (215, u'for'),
 (211, u'your'),
 (186, u'what'),
 (171, u'king'),
 (160, u'him'),
 (149, u'have'),
 (134, u'will'),
 (124, u'polonius'),
 (112, u'horatio')]
In [79]:
tmp = tmp.map(lambda x: (x[1], x[0]))
tmp.take(20)
Out[79]:
[(u'the', 930),
 (u'and', 843),
 (u'you', 496),
 (u'hamlet', 368),
 (u'that', 346),
 (u'lord', 278),
 (u'not', 262),
 (u'his', 238),
 (u'this', 238),
 (u'with', 232),
 (u'but', 232),
 (u'for', 215),
 (u'your', 211),
 (u'what', 186),
 (u'king', 171),
 (u'him', 160),
 (u'have', 149),
 (u'will', 134),
 (u'polonius', 124),
 (u'horatio', 112)]
In [70]:
plot(tmp.take(15))
In [71]:
plot(top_words)

Summary code

In [72]:
words = sc.textFile('hamlet.txt')\
        .flatMap(lambda line: re.split('\W+', line.lower().strip()))\
        .filter(lambda x: len(x) > 2 )\
        .map(lambda word: (word, 1))\
        .reduceByKey(lambda a, b: a + b)\
        .map(lambda x: (x[1], x[0])).sortByKey(False)   

words.take(15)
Out[72]:
[(930, u'the'),
 (843, u'and'),
 (496, u'you'),
 (368, u'hamlet'),
 (346, u'that'),
 (278, u'lord'),
 (262, u'not'),
 (238, u'his'),
 (238, u'this'),
 (232, u'with'),
 (232, u'but'),
 (215, u'for'),
 (211, u'your'),
 (186, u'what'),
 (171, u'king')]

Text filtering

In [80]:
words = sc.textFile('hamlet.txt')

How many lines contains hamlet?

In [82]:
tmp = words.filter(lambda x: "hamlet" in x.lower())
tmp.count()
Out[82]:
367

As a percent?

In [91]:
'{0:0.2f}%'.format(100*tmp.count()/float(words.count()))
Out[91]:
'7.26%'

How about claudius and hamlet?

In [93]:
tmp.filter( lambda x: 'claudius' in x.lower()).count()
Out[93]:
12
In [94]:
tmp.filter( lambda x: 'claudius' in x.lower()).collect()
Out[94]:
[u'\t[Enter KING CLAUDIUS, QUEEN GERTRUDE, HAMLET,',
 u"KING CLAUDIUS\tThough yet of Hamlet our dear brother's death",
 u"KING CLAUDIUS\t'Tis sweet and commendable in your nature, Hamlet,",
 u'KING CLAUDIUS\tHow fares our cousin Hamlet?',
 u'KING CLAUDIUS\tI have nothing with this answer, Hamlet; these words',
 u'KING CLAUDIUS\tWhat, Gertrude? How does Hamlet?',
 u"KING CLAUDIUS\tNow, Hamlet, where's Polonius?",
 u'KING CLAUDIUS\tHamlet, this deed, for thine especial safety,--',
 u'KING CLAUDIUS\tAy, Hamlet.',
 u'KING CLAUDIUS\tThy loving father, Hamlet.',
 u'KING CLAUDIUS\tFrom Hamlet! who brought them?',
 u"KING CLAUDIUS\t'Tis Hamlets character. 'Naked!"]

K-means

Based on the spark examples.

In [271]:
import os
import numpy as np

Create input file.

In [272]:
N = 5
num_points = 100

set1 = np.reshape(5 + np.random.randn(num_points*N), (num_points,N))
set2 = np.reshape(10 + np.random.randn(num_points*N), (num_points,N))
set3 = np.reshape(15 + np.random.randn(num_points*N), (num_points,N))

x = np.concatenate([set1, set2, set3])

np.savetxt('points.txt', x, delimiter=',', fmt='%10.5f') 

Using sklearn

In [273]:
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import euclidean_distances

data = np.loadtxt('points.txt', delimiter=',')
k_means = KMeans(init='k-means++', n_clusters=3, n_init=10)
k_means.fit(data)
Out[273]:
KMeans(copy_x=True, init='k-means++', max_iter=300, n_clusters=3, n_init=10,
    n_jobs=1, precompute_distances=True, random_state=None, tol=0.0001,
    verbose=0)
In [274]:
def plot(centers):
    %matplotlib inline
    import matplotlib.pyplot as plt
    for i in range(3):
        plt.plot(centers[i])
In [275]:
plot(k_means.cluster_centers_)

Using numpy

In [251]:
def closest_point(p, centers):  
    dist = map(lambda x: np.sum((p - x) ** 2), centers)
    return np.argmin(dist)

What is the closest center?

In [276]:
centers = data[:3].copy()
closets_center = partial(closest_point, centers=centers)
tmp = map(closets_center, data)

print tmp[:10]
[0, 1, 2, 0, 1, 2, 1, 1, 1, 1]

Iterate

In [277]:
k_points = data[:3].copy()

for i in range(30):
    closets_center = partial(closest_point, centers=k_points)
    tmp = np.array(map(closets_center, data))
    for i in range(len(centers)):
        k_points[i] = np.mean(data[tmp==i])
In [278]:
plot(k_points)

Spark

In [279]:
from pyspark import  SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '20g')

if 'sc' not in globals():
    sc = SparkContext( CLUSTER_URL, 'pyspark')

Convert line to numpy array of floats.

In [280]:
def parseVector(line):
    return np.array([float(x) for x in line.strip().split(',')])
In [281]:
lines = sc.textFile('points.txt')
lines.take(2)
Out[281]:
[u'   3.94795,   5.24514,   4.21983,   5.82014,   3.56495',
 u'   4.12071,   5.60875,   4.98164,   3.79047,   5.37827']
In [282]:
data = lines.map(parseVector).cache()
print data.count()
300

In [283]:
data.first()
Out[283]:
array([ 3.94795,  5.24514,  4.21983,  5.82014,  3.56495])
In [284]:
k_points = data.take(3)
temp_dist = 1.0

closest = data.map(
    lambda p : (closest_point(p, k_points), (p, 1)))
closest.take(2)
Out[284]:
[(0, (array([ 3.94795,  5.24514,  4.21983,  5.82014,  3.56495]), 1)),
 (1, (array([ 4.12071,  5.60875,  4.98164,  3.79047,  5.37827]), 1))]
In [285]:
point_stats = closest.reduceByKey(
    lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
point_stats.take(2)
Out[285]:
[(0,
  (array([ 203.28924,  195.7836 ,  188.10686,  233.8193 ,  178.81389]), 40)),
 (1,
  (array([ 2758.36735,  2753.10089,  2742.10208,  2743.79008,  2764.49942]),
   251))]
In [286]:
new_points = point_stats.map(
    lambda (x, (y, z)): (x, y / z)).collect()
print new_points
[(0, array([ 5.082231  ,  4.89459   ,  4.7026715 ,  5.8454825 ,  4.47034725])), (1, array([ 10.98951135,  10.96852944,  10.92470948,  10.93143458,  11.01394191])), (2, array([ 5.15993444,  3.87556556,  4.26854111,  4.40604556,  5.57423444]))]

In [287]:
temp_dist = sum(map(lambda (x,y): np.sum((k_points[x] - y) ** 2), 
                new_points))

for (x, y) in new_points:
    k_points[x] = y
In [288]:
for x in k_points:
    print x[:5]
[ 5.082231    4.89459     4.7026715   5.8454825   4.47034725]
[ 10.98951135  10.96852944  10.92470948  10.93143458  11.01394191]
[ 5.15993444  3.87556556  4.26854111  4.40604556  5.57423444]

Summary

In [289]:
lines = sc.textFile('points.txt')
data = lines.map(parseVector).cache()

k_points = data.take(3)
temp_dist = 1.0

while temp_dist > 0.001:

    closest = data.map(
        lambda p : (closest_point(p, k_points), (p, 1)))
    
    point_stats = closest.reduceByKey(
        lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
    
    new_points = point_stats.map(
        lambda (x, (y, z)): (x, y / z)).collect()

    temp_dist = sum(map(lambda (x,y): np.sum((k_points[x] - y) ** 2), 
                    new_points))
    
    for (x, y) in new_points:
        k_points[x] = y
In [290]:
for x in k_points:
    print x[:5]
[ 9.9623807  9.931509   9.9043745  9.9688528  9.9388466]
[ 15.013431   14.8657509  14.8416114  15.0812099  14.9176245]
[ 5.1051483  5.0403859  4.9402722  5.1225752  5.0783431]

In [291]:
plot(k_points)
In [1]:
import awsdata
reload(awsdata)

awsdata.get('https://s3.amazonaws.com/research_computing_tutorials/movies.dat')
awsdata.get('https://s3.amazonaws.com/research_computing_tutorials/ratings.dat')
awsdata.get('https://s3.amazonaws.com/research_computing_tutorials/users.dat')

Pandas

In [10]:
import pandas as pd

def get_movie_data():
    
    unames = ['user_id','gender','age','occupation','zip']
    users = pd.read_table('users.dat', 
                          sep='::', header=None, names=unames)
    
    rnames = ['user_id', 'movie_id', 'rating', 'timestamp']
    ratings = pd.read_table('ratings.dat', 
                            sep='::', header=None, names=rnames)
    
    mnames = ['movie_id', 'title','genres']
    movies = pd.read_table('movies.dat', 
                           sep='::', header=None, names=mnames)

    return users, ratings, movies
In [11]:
users, ratings, movies = get_movie_data()
In [12]:
print users.head()
   user_id gender  age  occupation    zip
0        1      F    1          10  48067
1        2      M   56          16  70072
2        3      M   25          15  55117
3        4      M   45           7  02460
4        5      M   25          20  55455

[5 rows x 5 columns]

In [13]:
print ratings.head()
   user_id  movie_id  rating  timestamp
0        1      1193       5  978300760
1        1       661       3  978302109
2        1       914       3  978301968
3        1      3408       4  978300275
4        1      2355       5  978824291

[5 rows x 4 columns]

In [14]:
print movies.head()
   movie_id                               title                        genres
0         1                    Toy Story (1995)   Animation|Children's|Comedy
1         2                      Jumanji (1995)  Adventure|Children's|Fantasy
2         3             Grumpier Old Men (1995)                Comedy|Romance
3         4            Waiting to Exhale (1995)                  Comedy|Drama
4         5  Father of the Bride Part II (1995)                        Comedy

[5 rows x 3 columns]

Clean up

In [23]:
tmp = movies.title.str.match('(.*) \(([0-9]+)\)')
movies['year'] = tmp.map(lambda x: x[1] if len(x) > 0 else None)
movies['short_title'] = tmp.map(lambda x: x[0][:40] if len(x) > 0 else None)

Join

In [24]:
data = pd.merge(pd.merge(ratings, users), movies)

Compute

In [27]:
tmp = data[['short_title','rating']]
grp = tmp.groupby('short_title')
mean_rating = grp['rating'].agg(['mean','count'])
print mean_rating.sort('mean', ascending=False).head(10)
                                        mean  count
short_title                                        
Bittersweet Motel                   5.000000      1
Baby, The                           5.000000      1
Schlafes Bruder (Brother of Sleep)  5.000000      1
Hour of the Pig, The                5.000000      1
Ulysses (Ulisse)                    5.000000      1
Follow the Bitch                    5.000000      1
Smashing Time                       5.000000      2
Gate of Heavenly Peace, The         5.000000      3
I Am Cuba (Soy Cuba/Ya Kuba)        4.750000      4
Lamerica                            4.666667      6

[10 rows x 2 columns]

In [28]:
mask = mean_rating['count'] > 1000
print mean_rating.ix[mask].sort('mean', ascending=False).head(10)
                                              mean  count
short_title                                              
Shawshank Redemption, The                 4.554005   1935
Godfather, The                            4.527091   1901
Usual Suspects, The                       4.517949   1560
Schindler's List                          4.504985   2006
Raiders of the Lost Ark                   4.492847   2167
Star Wars: Episode IV - A New Hope        4.459961   2585
Dr. Strangelove or: How I Learned to Sto  4.443590   1170
Sixth Sense, The                          4.417228   2078
Casablanca                                4.409283   1422
One Flew Over the Cuckoo's Nest           4.390866   1489

[10 rows x 2 columns]

Spark

In [5]:
from pyspark import  SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '20g')

if 'sc' not in globals():
    sc = SparkContext( CLUSTER_URL, 'pyspark')
In [6]:
users = sc.textFile('users.dat').map(lambda x: (int(x.split('::')[0]), x))
print users.count()
users.take(5)
6040

Out[6]:
[(1, u'1::F::1::10::48067'),
 (2, u'2::M::56::16::70072'),
 (3, u'3::M::25::15::55117'),
 (4, u'4::M::45::7::02460'),
 (5, u'5::M::25::20::55455')]
In [7]:
ratings = sc.textFile('ratings.dat').map(lambda x: (int(x.split('::')[0]), x))
print ratings.count()
ratings.take(5)
875657

Out[7]:
[(1, u'1::1193::5::978300760'),
 (1, u'1::661::3::978302109'),
 (1, u'1::914::3::978301968'),
 (1, u'1::3408::4::978300275'),
 (1, u'1::2355::5::978824291')]
In [8]:
movies = sc.textFile('movies.dat').map(lambda x: (int(x.split('::')[0]), x))
print movies.count()
movies.take(5)
3883

Out[8]:
[(1, u"1::Toy Story (1995)::Animation|Children's|Comedy"),
 (2, u"2::Jumanji (1995)::Adventure|Children's|Fantasy"),
 (3, u'3::Grumpier Old Men (1995)::Comedy|Romance'),
 (4, u'4::Waiting to Exhale (1995)::Comedy|Drama'),
 (5, u'5::Father of the Bride Part II (1995)::Comedy')]

Join

In [13]:
data = ratings.join(users)
data.take(10)
Out[13]:
[(2052, (u'2052::2997::1::974661754', u'2052::M::1::10::46033')),
 (2052, (u'2052::3005::3::974661796', u'2052::M::1::10::46033')),
 (2052, (u'2052::3016::3::974661930', u'2052::M::1::10::46033')),
 (2052, (u'2052::3248::3::974661507', u'2052::M::1::10::46033')),
 (2052, (u'2052::3285::1::974661754', u'2052::M::1::10::46033')),
 (2052, (u'2052::1831::1::974661443', u'2052::M::1::10::46033')),
 (2052, (u'2052::223::4::974661479', u'2052::M::1::10::46033')),
 (2052, (u'2052::2826::1::974661685', u'2052::M::1::10::46033')),
 (2052, (u'2052::2827::2::974661754', u'2052::M::1::10::46033')),
 (2052, (u'2052::2683::4::974661754', u'2052::M::1::10::46033'))]
In [14]:
tmp = data.map( lambda x: '::'.join(x[1]))
tmp = tmp.map( lambda x: (int(x.split('::')[1]), x))
tmp.take(5)
Out[14]:
[(2997, u'2052::2997::1::974661754::2052::M::1::10::46033'),
 (3005, u'2052::3005::3::974661796::2052::M::1::10::46033'),
 (3016, u'2052::3016::3::974661930::2052::M::1::10::46033'),
 (3248, u'2052::3248::3::974661507::2052::M::1::10::46033'),
 (3285, u'2052::3285::1::974661754::2052::M::1::10::46033')]
In [19]:
data = tmp.join(movies)
data.count()
data.take(5)
Out[19]:
[(3072,
  (u'24::3072::4::978132276::24::F::25::7::10023',
   u'3072::Moonstruck (1987)::Comedy')),
 (3072,
  (u'36::3072::4::978063192::36::M::25::3::94123',
   u'3072::Moonstruck (1987)::Comedy')),
 (3072,
  (u'2124::3072::5::974649244::2124::M::25::0::12306',
   u'3072::Moonstruck (1987)::Comedy')),
 (3072,
  (u'132::3072::4::977428857::132::M::25::17::99709',
   u'3072::Moonstruck (1987)::Comedy')),
 (3072,
  (u'4296::3072::5::965270114::4296::M::56::6::33030',
   u'3072::Moonstruck (1987)::Comedy'))]
In [20]:
data = data.map(lambda x: '::'.join(x[1]))
data.take(5)
Out[20]:
[u'24::3072::4::978132276::24::F::25::7::10023::3072::Moonstruck (1987)::Comedy',
 u'36::3072::4::978063192::36::M::25::3::94123::3072::Moonstruck (1987)::Comedy',
 u'2124::3072::5::974649244::2124::M::25::0::12306::3072::Moonstruck (1987)::Comedy',
 u'132::3072::4::977428857::132::M::25::17::99709::3072::Moonstruck (1987)::Comedy',
 u'4296::3072::5::965270114::4296::M::56::6::33030::3072::Moonstruck (1987)::Comedy']

Cleaning

In [21]:
def keep_cols(x):
    values = x.split('::')
    return (values[2], values[5], values[10], 1) #add 1 for the count
In [22]:
clean = data.map(keep_cols)
clean.take(5)
Out[22]:
[(u'4', u'F', u'Moonstruck (1987)', 1),
 (u'4', u'M', u'Moonstruck (1987)', 1),
 (u'5', u'M', u'Moonstruck (1987)', 1),
 (u'4', u'M', u'Moonstruck (1987)', 1),
 (u'5', u'M', u'Moonstruck (1987)', 1)]

Compute

Groupby movie

In [29]:
tmp = clean.map(lambda x: (x[2], x)).groupByKey()
print tmp.count()
3693

Compute the mean rating and count.

In [30]:
def movie_count_rating(x):
    title = x[0]
    mean_rating = 0.
    count = len(x[1])
    for rating in x[1]:
        mean_rating += int(rating[0])
    return (title, count, mean_rating/float(count))
In [31]:
results = tmp.map(movie_count_rating)
results.take(3)
Out[31]:
[(u'Harriet the Spy (1996)', 104, 3.0961538461538463),
 (u'Mad Love (1995)', 41, 2.1219512195121952),
 (u'Smoke (1995)', 180, 3.7222222222222223)]

Highest rated movies?

In [32]:
results.map(lambda x: (x[2], x)).sortByKey(False).take(10)
Out[32]:
[(5.0, (u'Schlafes Bruder (Brother of Sleep) (1995)', 1, 5.0)),
 (5.0, (u'Smashing Time (1967)', 2, 5.0)),
 (5.0, (u'Baby, The (1973)', 1, 5.0)),
 (5.0, (u'Bittersweet Motel (2000)', 1, 5.0)),
 (5.0, (u'Gate of Heavenly Peace, The (1995)', 3, 5.0)),
 (5.0, (u'Follow the Bitch (1998)', 1, 5.0)),
 (5.0, (u'Ulysses (Ulisse) (1954)', 1, 5.0)),
 (5.0, (u'Hour of the Pig, The (1993)', 1, 5.0)),
 (4.75, (u'I Am Cuba (Soy Cuba/Ya Kuba) (1964)', 4, 4.75)),
 (4.666666666666667, (u'Lamerica (1994)', 6, 4.666666666666667))]

Highest rated with more than 1000 ratings?

In [33]:
top = results.filter(lambda x: x[1] > 1000).map(lambda x: (x[2], x)).sortByKey(False).take(10)
In [34]:
top
Out[34]:
[(4.554005167958656,
  (u'Shawshank Redemption, The (1994)', 1935, 4.554005167958656)),
 (4.52709100473435, (u'Godfather, The (1972)', 1901, 4.52709100473435)),
 (4.517948717948718, (u'Usual Suspects, The (1995)', 1560, 4.517948717948718)),
 (4.504985044865403, (u"Schindler's List (1993)", 2006, 4.504985044865403)),
 (4.4928472542685745,
  (u'Raiders of the Lost Ark (1981)', 2167, 4.4928472542685745)),
 (4.459961315280464,
  (u'Star Wars: Episode IV - A New Hope (1977)', 2585, 4.459961315280464)),
 (4.443589743589744,
  (u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)',
   1170,
   4.443589743589744)),
 (4.417228103946102, (u'Sixth Sense, The (1999)', 2078, 4.417228103946102)),
 (4.409282700421941, (u'Casablanca (1942)', 1422, 4.409282700421941)),
 (4.39086635325722,
  (u"One Flew Over the Cuckoo's Nest (1975)", 1489, 4.39086635325722))]
In [35]:
for m in top:
    print m[1][2], m[1][0][:30]
4.55400516796 Shawshank Redemption, The (199
4.52709100473 Godfather, The (1972)
4.51794871795 Usual Suspects, The (1995)
4.50498504487 Schindler's List (1993)
4.49284725427 Raiders of the Lost Ark (1981)
4.45996131528 Star Wars: Episode IV - A New 
4.44358974359 Dr. Strangelove or: How I Lear
4.41722810395 Sixth Sense, The (1999)
4.40928270042 Casablanca (1942)
4.39086635326 One Flew Over the Cuckoo's Nes

Logistic Regression

In [1]:
import numpy as np
from sklearn import datasets
from sklearn.utils import shuffle

random_state = np.random.RandomState(0)

iris = datasets.load_iris()
X = iris.data
y = iris.target

Make it a binary classification problem by removing the third class

In [2]:
X, y = X[y != 0], y[y != 0]
n_samples, n_features = X.shape

y[y==1] = 0
y[y==2] = 1
In [3]:
print X.shape, y.shape
print set(y)
(100, 4) (100,)
set([0, 1])

Using sklearn

In [4]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.cross_validation import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

clf = LogisticRegression()
clf.fit(X_train,y_train)
y_pred_test = clf.predict(X_test)
y_pred_train = clf.predict(X_train)
print accuracy_score(y_test, y_pred_test)
print accuracy_score(y_train, y_pred_train)
0.85
0.9875

Save to file

In [5]:
print y_train.shape
print y_train.reshape(y_train.shape[0],1).shape
print X_train.shape
cX = np.concatenate((y_train.reshape(80,1), X_train), axis=1)
cX.shape
(80,)
(80, 1)
(80, 4)

Out[5]:
(80, 5)

Write to file....

In [6]:
np.savetxt('iris_train.csv', cX, delimiter=' ', fmt='%0.4f')
!head iris_train.csv
1.0000 7.6000 3.0000 6.6000 2.1000
1.0000 6.0000 3.0000 4.8000 1.8000
0.0000 6.8000 2.8000 4.8000 1.4000
0.0000 5.8000 2.6000 4.0000 1.2000
1.0000 6.0000 2.2000 5.0000 1.5000
0.0000 6.7000 3.1000 4.4000 1.4000
0.0000 5.5000 2.6000 4.4000 1.2000
1.0000 6.3000 2.5000 5.0000 1.9000
0.0000 5.2000 2.7000 3.9000 1.4000
1.0000 7.7000 2.8000 6.7000 2.0000

In [7]:
cX = np.concatenate((y_test.reshape(len(y_test),1), X_test), axis=1)
np.savetxt('iris_test.csv', cX, delimiter=' ', fmt='%0.4f')

With Spark

In [8]:
from pyspark import  SparkContext
sc = SparkContext( CLUSTER_URL, 'pyspark')
In [9]:
points = sc.textFile('iris_train.csv',10) #across 10 cpus
points.take(5)
Out[9]:
[u'1.0000 7.6000 3.0000 6.6000 2.1000',
 u'1.0000 6.0000 3.0000 4.8000 1.8000',
 u'0.0000 6.8000 2.8000 4.8000 1.4000',
 u'0.0000 5.8000 2.6000 4.0000 1.2000',
 u'1.0000 6.0000 2.2000 5.0000 1.5000']
In [10]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

parsed_data = points.map(lambda line: np.array([float(x) for x in line.split(' ')]))
parsed_data.take(5)
Out[10]:
[array([ 1. ,  7.6,  3. ,  6.6,  2.1]),
 array([ 1. ,  6. ,  3. ,  4.8,  1.8]),
 array([ 0. ,  6.8,  2.8,  4.8,  1.4]),
 array([ 0. ,  5.8,  2.6,  4. ,  1.2]),
 array([ 1. ,  6. ,  2.2,  5. ,  1.5])]
In [11]:
model = LogisticRegressionWithSGD.train(parsed_data)

y = parsed_data.map(lambda x: int(x[0]))
y.take(5)
Out[11]:
[1, 1, 0, 0, 1]
In [12]:
y_pred = parsed_data.map(lambda x: model.predict(x[1:]))
y_pred.take(5)
Out[12]:
[1, 1, 0, 0, 1]
In [13]:
tmp = y.zip(y_pred)
tmp.take(5)
Out[13]:
[(1, 1), (1, 1), (0, 0), (0, 0), (1, 1)]

Training accuracy

In [14]:
1.0 - tmp.filter(lambda (y, p): y!=p).count()/float(parsed_data.count())
Out[14]:
0.975

Test accuracy

In [15]:
points = sc.textFile('iris_test.csv',10) #across 10 cpus
parsed_data = points.map(lambda line: np.array([float(x) for x in line.split(' ')]))
tmp = parsed_data.map(lambda x: (int(x[0]), model.predict(x[1:])))
1.0 - tmp.filter(lambda (y, p): y!=p).count()/float(parsed_data.count())
Out[15]:
0.9