Here is my code for the famous "finding the closest pair of points" problem from CLRS book section 33.4 using a divide-and-conquer approach (this code returns the minimum distance between pair of points).
This code works correctly but it's slow. As seen in the code, I pre-sort so I avoid sorting at every recursive call, however, my run time is still too long.
Please help me speed this code up; other suggestions about this code is also much appreciated as I am a novice Python enthusiast.
import statistics as stats
import math
# helper functions:
def two_point_distance(p0,p1):
# returns distance between two (x,y) pairs
return ( (p0[0]-p1[0])**2 + (p0[1] - p1[1])**2 )**0.5
def combine_xy(x_arr,y_arr):
# combine x_arr and y_arr to combined list of (x,y) tuples
return list(zip(x_arr,y_arr))
def find_closest_distance_brute(xy_arr):
# brute force approach to find closest distance
dmin = math.inf
for i in range (len(xy_arr)-1):
dis_storage = []
for j in range (1,len(xy_arr)-i):
d_i_ipj = two_point_distance(xy_arr[i], xy_arr[i+j])
dis_storage.append(d_i_ipj)
dis_storage_min = min(dis_storage)
if dis_storage_min < dmin:
dmin = dis_storage_min
return dmin
def calc_median_x(xy_arr):
# return median of x values in list of (x,y) points
return stats.median([val[0] for val in xy_arr ])
def filter_set(xy_arr_y_sorted, median, distance):
# filter the entire set such than |x-median|<=min distance in halves
out = []
for val in xy_arr_y_sorted:
val_x = val[0]
if abs(val_x-median) <= distance:
out.append(val)
return out
def x_sort(xy_arr):
# sort array according to x value
return sorted(xy_arr, key=lambda val: val[0])
def y_sort(xy_arr):
# sort array according to y value
return sorted(xy_arr, key=lambda val: val[1])
def split_array(arr_x_sorted, arr_y_sorted,median):
# split array of size n to two arrays of n/2
# input is the same array twice, one sorted wrt x, the other wrt y
leq_arr_x_sorted, grt_arr_x_sorted = [],[]
dmy_x = 0 # switch between left and right when val_x==median
for val in arr_x_sorted:
val_x = val[0]
if val_x < median:
leq_arr_x_sorted.append(val)
if val_x > median:
grt_arr_x_sorted.append(val)
if val_x == median:
if dmy_x == 0:
leq_arr_x_sorted.append(val)
dmy_x = 1
else:
grt_arr_x_sorted.append(val)
dmy_x = 0
leq_arr_y_sorted, grt_arr_y_sorted = [],[]
dmy_y = 0 # switch between left and right when val_x==median
for val in arr_y_sorted:
val_x = val[0]
if val_x < median:
leq_arr_y_sorted.append(val)
if val_x > median:
grt_arr_y_sorted.append(val)
if val_x == median:
if dmy_y == 0:
leq_arr_y_sorted.append(val)
dmy_y = 1
else:
grt_arr_y_sorted.append(val)
dmy_y = 0
return leq_arr_x_sorted, leq_arr_y_sorted, grt_arr_x_sorted, grt_arr_y_sorted
def find_min_distance_in_rec(xy_arr_y_sorted,dmin):
# takes in array sorted in y, and minimum distance of n/2 halves
# for each point it computes distance to 7 subsequent points
# output min distance encountered
dmin_rec = dmin
if len(xy_arr_y_sorted) == 1:
return math.inf
if len(xy_arr_y_sorted) > 7:
for i in range(len(xy_arr_y_sorted)-7):
dis_storage = []
for j in range(1,8):
d_i_ipj = two_point_distance(xy_arr_y_sorted[i],xy_arr_y_sorted[i+j])
dis_storage.append(d_i_ipj)
dis_storage_min = min(dis_storage)
if dis_storage_min < dmin_rec:
dmin_rec = dis_storage_min
for k in range(len(xy_arr_y_sorted)-7, len(xy_arr_y_sorted)-1):
dis_storage = []
for l in range(1,len(xy_arr_y_sorted)-k):
d_k_kpl = two_point_distance(xy_arr_y_sorted[k], xy_arr_y_sorted[k+l])
dis_storage.append(d_k_kpl)
dis_storage_min = min(dis_storage)
if dis_storage_min < dmin_rec:
dmin_rec = dis_storage_min
else:
for m in range(0,len(xy_arr_y_sorted)-1):
dis_storage = []
for n in range (1,len(xy_arr_y_sorted)-m):
d_m_mpn = two_point_distance(xy_arr_y_sorted[m], xy_arr_y_sorted[m+n])
dis_storage.append(d_m_mpn)
dis_storage_min = min(dis_storage)
if dis_storage_min < dmin_rec:
dmin_rec = dis_storage_min
return dmin_rec
def find_closest_distance_recur(xy_arr_x_sorted, xy_arr_y_sorted):
# recursive function to find closest distance between points
if len(xy_arr_x_sorted) <=3 :
return find_closest_distance_brute(xy_arr_x_sorted)
median = calc_median_x(xy_arr_x_sorted)
leq_arr_x_sorted, leq_arr_y_sorted , grt_arr_x_sorted, grt_arr_y_sorted = split_array(xy_arr_x_sorted, xy_arr_y_sorted, median)
distance_left = find_closest_distance_recur(leq_arr_x_sorted, leq_arr_y_sorted)
distance_right = find_closest_distance_recur(grt_arr_x_sorted, grt_arr_y_sorted)
distance_min = min(distance_left, distance_right)
filt_out = filter_set(xy_arr_y_sorted, median, distance_min)
distance_filt = find_min_distance_in_rec(filt_out, distance_min)
return min(distance_min, distance_filt)
def find_closest_point(x_arr, y_arr):
# input is x,y points in two arrays, all x's in x_arr, all y's in y_arr
xy_arr = combine_xy(x_arr,y_arr)
xy_arr_x_sorted = x_sort(xy_arr)
xy_arr_y_sored = y_sort(xy_arr)
min_distance = find_closest_distance_recur(xy_arr_x_sorted, xy_arr_y_sored)
return min_distance
2 Answers 2
Speeding up the helper.
Exponentiation is an expensive operation, time wise. Instead of (p0[0]-p1[0])**2
, use (p0[0]-p1[0])*(p0[0]-p1[0])
Consider returning distance-squared. With the "brute force" solution, the minimum distance corresponds to the minimum squared distance, so you can find the square-root of the minimum, instead of the minimum of the square-roots, and save many expensive calculations.
Check: math.sqrt(...)
may be faster than (...)**0.5
Speeding up find_closest_distance_brute()
:
What is dis_storage
for? You accumulate distances in a list, take the minimum of the list, and if the minimum is less than a global minimum, you update the global minimum. The list is then thrown away.
There is no need to build up the list! It is a waste of time and memory. A generator expression can pass a "list" of data to the min
function, without actually creating the list.
dis_storage_min = min( two_point_distance(xy_arr[i], pnt)
for pnt in xy_arr[i+1:])
Passing xy_arr[i]
to the two_point_distance()
function is inefficient; you should lookup the value from the array once and save it in a local variable, instead of looking up the same value each time through the loop. The for
loop is actually the ideal place to get the value looked up. Instead of looping over the range()
of the len()
of xy_arr
, loop over the values from the array. Looping over the enumeration gives the index at the same time:
dmin = math.inf
for i, pnt_i in enumerate(xy_arr[:-1]):
dis_storage_min = min( two_point_distance(pnt_i, pnt_j)
for pnt_j in xy_arr[i+1:])
if dis_storage_min < dmin:
dmin = dis_storage_min
Note the use of xy_arr[:-1]
to avoid the last point of the array, and xy_arr[i+1:]
to loop over all points after the ith value.
The above can be refactored into one min(...)
statements, with a generator using two for
loops. Exercise (temporary) left to student.
Like the min
function above, you can use a generator expression to calculate the median without needlessly creating a list. Just remove the []
’s from the call:
def calc_median_x(xy_arr):
# return median of x values in list of (x,y) points
return stats.median( val[0] for val in xy_arr )
The filter_set function can be written using list comprehension, instead of the inefficient list.append()
approach:
def filter_set(xy_arr_y_sorted, median, distance):
# filter the entire set such than |x-median|<=min distance in halves
return [ val for val in xy_arr_y_sorted
if abs(val[0] - median) <= distance ]
split_array()
:
Again, list comprehension is better than appending.
leq_arr_x_sorted = [ val for val in arr_x_sorted if val[0] < median ]
geq_arr_x_sorted = [ val for val in arr_x_sorted if val[0] > median ]
eq_x = [ val for val in arr_x_sorted if val[0] == median ]
You can then add half of the eq_x array to the leq array, the other half to the geq array:
n = len(eq_x)//2
leq_arr_x_sorted = leq_arr_x_sorted + eq_x[:n]
geq_arr_x_sorted = eq_x[n:] + geq_arr_x_sorted
Similar comments of course apply to the arr_y_sorted
.
But this may still be busy work. If the array is already sorted by x coordinate, then:
n = len(arr_x_sorted) // 2
leq_arr_x_sorted = arr_x_sorted[:n]
geq_arr_x_sorted = arr_x_sorted[n:]
will give you approximately the same thing. It diverges when multiple copies of the median exist but are not balanced across the centre. Ie:
[ 1, 2, 3, 4, 4, 4, 4, 8 ]
will split as:
[ 1, 2, 3, 4, 4 ] [ 4, 4, 8 ] // yours
[ 1, 2, 3, 4 ] [ 4, 4, 4, 8 ] // mine
Whether you can use this will depend on whether you need the y sorted data partitioned in exactly the same way as the x.
find_min_distance_in_rect()
:
Similar comments about looping, of course.
The else:
clause (7 or less points) looks like you are implementing a brute force search for the minimum distance. But you’ve already written that; just call find_min_dist_brute(xy_arr_y_sorted)
. It won’t speed up your code, but it will make it shorter.
Looping over all but the last 7 points, and searching the 7 points immediately after them, and then looping over the last 7 points and searching the remaining 7-k points is very awkward code. The theory says you don’t need to compare more than 7 comparisons per point, and that is important to keep the algorithm O(n). But coding wise, that’s a mess.
Better: for every point, compare it with each point after it, until you come to a point whose y coordinate is further than dmin
. Not a 7 anywhere in sight. Just a double for loop, with a break statement in the inner loop. The theory guarantees that you won’t need to loop more than 7 times in the inner loop; we don’t need that codified as well. We might actually gain a tiny bit of speed, because you’ll often loop less than 7 times.
-
\$\begingroup\$ Thanks @AJNeufeld; I incorporated all your suggestions. In addition, I re-wrote the
find_min_distance_in_rec
based on your earlier suggestions onfind_closest_distance_brute()
function where I now use list comprehension and generator expressions. The run time improved by 30% which is significant but the code is still a tad bit slow. I appreciate if you have further suggestions for speed improvement. \$\endgroup\$mghah– mghah2018年07月16日 01:52:59 +00:00Commented Jul 16, 2018 at 1:52 -
\$\begingroup\$ I’ve provided a few additional points. Once you’ve incorporated those, if you still need additional help, it would be wise to post a follow up question containing the improved code, with a link back to this one, and a link in this one to the new question. See what should I do when someone answers my question for additional details. \$\endgroup\$AJNeufeld– AJNeufeld2018年07月16日 21:28:54 +00:00Commented Jul 16, 2018 at 21:28
-
\$\begingroup\$ I created a follow up question since the code changed significantly. link to follow up question \$\endgroup\$mghah– mghah2018年07月17日 17:51:25 +00:00Commented Jul 17, 2018 at 17:51
def split_array(arr_x_sorted, arr_y_sorted,median): # split array of size n to two arrays of n/2 # input is the same array twice, one sorted wrt x, the other wrt y leq_arr_x_sorted, grt_arr_x_sorted = [],[] dmy_x = 0 # switch between left and right when val_x==median for val in arr_x_sorted: val_x = val[0] if val_x < median: leq_arr_x_sorted.append(val) if val_x > median: grt_arr_x_sorted.append(val) if val_x == median: if dmy_x == 0: leq_arr_x_sorted.append(val) dmy_x = 1 else: grt_arr_x_sorted.append(val) dmy_x = 0 leq_arr_y_sorted, grt_arr_y_sorted = [],[] dmy_y = 0 # switch between left and right when val_x==median for val in arr_y_sorted: val_x = val[0] if val_x < median: leq_arr_y_sorted.append(val) if val_x > median: grt_arr_y_sorted.append(val) if val_x == median: if dmy_y == 0: leq_arr_y_sorted.append(val) dmy_y = 1 else: grt_arr_y_sorted.append(val) dmy_y = 0 return leq_arr_x_sorted, leq_arr_y_sorted, grt_arr_x_sorted, grt_arr_y_sorted
There's a 15-line chunk of code there that's repeated twice, so if this is a correct implementation that could be factored out.
However, I don't believe that it is a correct implementation, on two counts.
My understanding is that the end result should be four arrays such that
leq_arr_x_sorted
andleq_arr_y_sorted
contain the same points sorted on different projections. But thedmy_x
/dmy_y
separation of points whosex
coordinate ismedian
doesn't seem to guarantee that those points will be sent to the same half in thex
-projection and they
-projection. In order to guarantee that an additional pre-condition is needed: most obviously that inarr_x_sorted
ties are broken byy
; but that pre-condition is not produced byx_sort
.My understanding is that the split should be 50-50, even in degenerate cases where all of the points have the same
x
-coordinate.
The easiest way that I can see to fix both problems is to do a "Dutch flag" partition into less-than, equal, and greater-than; and then to split the values sorted on y
and use that split for both (since all the x
-coordinates are the same, it's sorted on both coordinates):
def split_array(x_sorted, y_sorted, median_x):
lt_x_sorted = [point for point in x_sorted if point[0] < median_x]
gt_x_sorted = [point for point in x_sorted if point[0] > median_x]
lt_y_sorted = [point for point in y_sorted if point[0] < median_x]
eq_y_sorted = [point for point in y_sorted if point[0] == median_x]
gt_y_sorted = [point for point in y_sorted if point[0] > median_x]
# Make the two halves equal in size
split = len(x_sorted) // 2 - len(lt_x_sorted)
eq_left = eq_y_sorted[:split]
eq_right = eq_y_sorted[split:]
return lt_x_sorted + eq_left, lt_y_sorted + eq_left,
eq_right + gt_x_sorted, eq_right + gt_y_sorted
def find_closest_distance_recur(xy_arr_x_sorted, xy_arr_y_sorted): # recursive function to find closest distance between points if len(xy_arr_x_sorted) <=3 : return find_closest_distance_brute(xy_arr_x_sorted) median = calc_median_x(xy_arr_x_sorted)
Hang on! The median of a sorted array doesn't need a call to stats.median
! xy_arr_x_sorted[len(xy_arr_x_sorted) // 2][0]
.
leq_arr_x_sorted, leq_arr_y_sorted , grt_arr_x_sorted, grt_arr_y_sorted = split_array(xy_arr_x_sorted, xy_arr_y_sorted, median) ... filt_out = filter_set(xy_arr_y_sorted, median, distance_min) distance_filt = find_min_distance_in_rec(filt_out, distance_min)
By filtering to a single strip, this will end up comparing points which have already been compared. It might be worth filtering leq_arr_y_sorted
and grt_arr_y_sorted
and then modifying find_min_distance_in_rec
to take the strips separately and only do cross-strip comparisons.
-
1\$\begingroup\$ If the arrays become unsorted after calls to
split_array
that means there's a bug insplit_array
, but I can't see it. On the other hand, if you're willing to accept thatsplit_array
will unsort the elements, I don't see why it's worth sorting them in the first place. In either case, it sounds like the code needs a lot more comments to document what assumptions are made and what assumptions should not be made because they're unsafe. \$\endgroup\$Peter Taylor– Peter Taylor2018年07月19日 07:21:06 +00:00Commented Jul 19, 2018 at 7:21 -
1\$\begingroup\$ I concur. The arrays becoming unsorted in
split_array
would be a bug, but I can see it either. If an array is sorted, iterating over it and appending each element to one of two arrays, even randomly, will result in two sorted arrays. And the median of a sorted array does not need a call tostats.median
. Most worrisome, if you’ve actually accepted thatsplit_array
returns unsorted data, why would you assign the returned values to variables withsorted
in their name? \$\endgroup\$AJNeufeld– AJNeufeld2018年07月19日 14:06:40 +00:00Commented Jul 19, 2018 at 14:06 -
\$\begingroup\$ Peter’s calculation of median is slightly incorrect 50% of the time, so could give a different answer than
stats.median
at those times. Whenlen()
is even, the correct median would be the average of the two middle values. I chalk that minor faux-pas up to laziness; the major point was clear: the median of a sorted array can be obtained in O(1) time, instead of from an O(n log n) library call. \$\endgroup\$AJNeufeld– AJNeufeld2018年07月19日 14:19:54 +00:00Commented Jul 19, 2018 at 14:19 -
1\$\begingroup\$ @DRmo This is Code Review, not Code Debugging. Peter’s review caught a bug in one of your functions, based on the understanding one can glean from sparsely commented code, & demonstrated the function’s bug with sample input in the follow up question’s answer. He offered suggestions on how to fix the bug. Asking him to find input that produces buggy final output is beyond the scope of CR. Your argument suggests your program is "bug free" despite a demonstrable bug in a subroutine. This is a risky & questionable stance to take. Say thank-you, upvote the answer, fix the bug, & move on. \$\endgroup\$AJNeufeld– AJNeufeld2018年07月19日 19:00:05 +00:00Commented Jul 19, 2018 at 19:00
-
1\$\begingroup\$ @DRmo, I've just dug out my copy of CLR and I stand by the term bug: the code is documented to implement the book's approach, and the book explicitly says that the points in \$X_L\$ and \$Y_L\$ are the same. That aside, I don't understand the asymmetry in insisting that I should provide you with a test case which demonstrates amplification of a bug that is already demonstrated to exist, while you claim that my proposed replacement introduces bugs but don't provide a test case to support that. Post your test suite and I'll fix any bugs in my suggestion. \$\endgroup\$Peter Taylor– Peter Taylor2018年07月19日 21:30:19 +00:00Commented Jul 19, 2018 at 21:30
Explore related questions
See similar questions with these tags.