Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit ca4d3ba

Browse files
added minmax_use_mappartitions_v2.py
1 parent 93cbf20 commit ca4d3ba

File tree

2 files changed

+187
-0
lines changed

2 files changed

+187
-0
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
from __future__ import print_function
2+
import sys
3+
from pyspark.sql import SparkSession
4+
5+
#-----------------------------------------------------
6+
# Find Minimum and Maximum of all input by
7+
# using the mapPartitions() transformations.
8+
#
9+
# The idea is that each partition will find
10+
# (local_min, local_max, local_count)
11+
# and then we find (final_min, final_max, final_count)
12+
# for all partitions.
13+
#
14+
# input ---- N partitioned ----> partition-1, partition-2, ... partition_N
15+
#
16+
# partition-1 => local_1 = (local_min_1, local_max_1, local_count_1)
17+
# partition-2 => local_2 = (local_min_2, local_max_2, local_count_2)
18+
# ...
19+
# partition-N => local_N = (local_min_N, local_max_N, local_count_N)
20+
#
21+
# final_min_max = minmax(local_1, local_2, ..., local_N)
22+
#
23+
#------------------------------------------------------
24+
# Input Parameters:
25+
# INPUT_PATH as a file of numbers
26+
#
27+
# Example: sample_numbers.txt
28+
#
29+
# $ cat sample_numbers.txt
30+
#23,24,22,44,66,77,44,44,555,666
31+
#12,4,555,66,67,68,57,55,56,45,45,45,66,77
32+
#34,35,36,97300,78,79
33+
#120,44,444,445,345,345,555
34+
#11,33,34,35,36,37,47,7777,8888,6666,44,55
35+
#10,11,44,66,77,78,79,80,90,98,99,100,102,103,104,105
36+
#6,7,8,9,10
37+
#8,9,10,12,12
38+
#7777
39+
#222,333,444,555,666,111,112,5,113,114
40+
#5555,4444,24
41+
#
42+
#
43+
#-------------------------------------------------------
44+
# @author Mahmoud Parsian
45+
#-------------------------------------------------------
46+
47+
#
48+
#
49+
#==========================================
50+
# Find (min, max, count) for a given single partition.
51+
#
52+
# partition_iterator is an iterator over
53+
# elements of a single partition.
54+
# partition_iterator : iterator over
55+
# set of input records and each input record
56+
# has the format as:
57+
# <number><,><number><,>...<number>
58+
#
59+
def minmax(partition_iterator):
60+
#
61+
print("type(partition_iterator)=", type(partition_iterator))
62+
#('type(partition_iterator)=', <type 'itertools.chain'>)
63+
# type(partition_iterator)= <type 'generator'>
64+
#
65+
try:
66+
first_record = next(partition_iterator)
67+
print("first_record=", first_record)
68+
except StopIteration:
69+
# for empty partitions
70+
return [None]
71+
#
72+
numbers = [int(n) for n in first_record.split(",")]
73+
local_min = min(numbers)
74+
local_max = max(numbers)
75+
local_count = len(numbers)
76+
#
77+
# handle remaining records in a partition
78+
for record in partition_iterator:
79+
#print("record=", record)
80+
numbers = [int(n) for n in record.split(",")]
81+
min2 = min(numbers)
82+
max2 = max(numbers)
83+
# update min, max, and count
84+
local_count += len(numbers)
85+
local_max = max(local_max, max2)
86+
local_min = min(local_min, min2)
87+
# end-for
88+
return [(local_min, local_max, local_count)]
89+
#end-def
90+
#
91+
#==========================================
92+
#
93+
# find final (min, max, count) from all partitions
94+
# min_max_count_list = [
95+
# (min_1, max_1, count_1),
96+
# (min_2, max_2, count_2),
97+
# ...
98+
# (min_N, max_N, count_N)
99+
# ]
100+
#
101+
def find_min_max_count(min_max_count_list):
102+
first_time = True
103+
# iterate tuple3 in min_max_count_list:
104+
for local_min, local_max, local_count in min_max_count_list:
105+
if (first_time):
106+
final_min = local_min
107+
final_max = local_max
108+
final_count = local_count
109+
first_time = False
110+
else:
111+
final_min = min(final_min, local_min)
112+
final_max = max(final_max, local_max)
113+
final_count += local_count
114+
#end-for
115+
return (final_min, final_max, final_count)
116+
#end-def
117+
#==========================================
118+
#
119+
def debug_partition(iterator):
120+
print("===begin-partition===")
121+
for x in iterator:
122+
print(x)
123+
print("===end-partition===")
124+
#end-def
125+
#
126+
#==========================================
127+
# main():
128+
129+
if len(sys.argv) != 2:
130+
print("Usage: ", __file__, "<input-path>", file=sys.stderr)
131+
exit(-1)
132+
133+
# create an instance of SparkSession
134+
spark = SparkSession.builder.appName("minmax").getOrCreate()
135+
#
136+
137+
# handle input parameter
138+
input_path = sys.argv[1]
139+
print("input_path=", input_path)
140+
141+
#=====================================
142+
# read input and apply mapPartitions()
143+
#=====================================
144+
# rdd: RDD[String]
145+
rdd = spark.sparkContext.textFile(input_path)
146+
print("rdd=", rdd)
147+
print("rdd.count=", rdd.count())
148+
print("rdd.collect()=", rdd.collect())
149+
print("rdd.getNumPartitions()=", rdd.getNumPartitions())
150+
#
151+
#=====================================
152+
# find (min, max, count) per partition
153+
# custom function is minmax
154+
#=====================================
155+
# min_max_count: RDD[(min, max, count)]
156+
# min_max_count: [(min_1, max_1, count_1),
157+
# (min_2, max_2, count_2),
158+
# ...,
159+
# (min_N, max_N, count_N)]
160+
#
161+
# Apply mapPartitions() and then drop None elements
162+
min_max_count = rdd.mapPartitions(minmax).filter(lambda x: x is not None)
163+
#
164+
print("min_max_count=", min_max_count)
165+
print("min_max_count.count=", min_max_count.count())
166+
min_max_count_list = min_max_count.collect()
167+
print("min_max_count.collect()=", min_max_count_list)
168+
169+
#=====================================
170+
# find final (min, max, count) from all partitions
171+
#=====================================
172+
final_min, final_max, final_count = find_min_max_count(min_max_count_list)
173+
print("final: (min, max, count)= (", final_min, ", ", final_max, ", ", final_count, ")")
174+
175+
# done!
176+
spark.stop()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#-----------------------------------------------------
2+
# This is a shell script to run minmax_use_mappartitions.py
3+
#-----------------------------------------------------
4+
# @author Mahmoud Parsian
5+
#-----------------------------------------------------
6+
export SPARK_HOME="/book/spark-3.2.0"
7+
export INPUT_PATH="/book/code/chap10/sample_numbers.txt"
8+
export SPARK_PROG="/book/code/chap10/minmax_use_mappartitions.py"
9+
#
10+
# run the PySpark program:
11+
$SPARK_HOME/bin/spark-submit $SPARK_PROG $INPUT_PATH

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /