Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Dec 22, 2021. It is now read-only.

Commit 24f1f7c

Browse files
committed
PERL-1121 - Fix BulkWriteResult inserted IDs
1 parent ecd7cda commit 24f1f7c

File tree

121 files changed

+2600
-8348
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+2600
-8348
lines changed

‎lib/MongoDB/BulkWriteResult.pm

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ sub _parse_cmd_result {
161161
MongoDB::UsageError->throw("parse requires 'op' and 'result' arguments");
162162
}
163163

164-
my ( $op, $op_count, $batch_count, $result, $cmd_doc ) =
165-
@{$args}{qw/op op_count batch_count result cmd_doc/};
164+
my ( $op, $op_count, $batch_count, $result, $cmd_doc, $idx_map ) =
165+
@{$args}{qw/op op_count batch_count result cmd_doc idx_map/};
166166

167167
$result = $result->output
168168
if eval { $result->isa("MongoDB::CommandResult") };
@@ -197,11 +197,16 @@ sub _parse_cmd_result {
197197
$attrs{upserted_count} = @{ $result->{upserted} };
198198
}
199199

200+
my %error_idx = (
201+
map { $_->{index} => 1 } @{ $result->{writeErrors} },
202+
);
203+
200204
# recover _ids from documents
201205
if ( exists($result->{n}) && $op eq 'insert' ) {
202206
my @pairs;
203207
my $docs = {@$cmd_doc}->{documents};
204208
for my $i ( 0 .. $result->{n}-1 ) {
209+
next if $error_idx{$i};
205210
push @pairs, { index => $i, _id => $docs->[$i]{metadata}{_id} };
206211
}
207212
$attrs{inserted} = \@pairs;
@@ -220,6 +225,12 @@ sub _parse_cmd_result {
220225
$attrs{modified_count} = ( $op eq 'update' || $op eq 'upsert' ) ?
221226
$result->{nModified} : 0;
222227

228+
# Remap all indices back to original queue index
229+
# in unordered batches, these numbers can end up pointing to the wrong index
230+
for my $attr (qw/write_errors upserted inserted/) {
231+
map { $_->{index} = $idx_map->[$_->{index}] } @{ $attrs{$attr} };
232+
}
233+
223234
return $class->_new(%attrs);
224235
}
225236

@@ -302,19 +313,15 @@ sub _merge_result {
302313
$self->_set_modified_count(undef);
303314
}
304315

305-
# Append error and upsert docs, but modify index based on op count
306-
my $op_count = $self->op_count;
316+
# Append error and upsert docs, index is dealt with in _parse_cmd_result
307317
for my $attr (qw/write_errors upserted inserted/) {
308-
for my $doc ( @{ $result->$attr } ) {
309-
$doc->{index} += $op_count;
310-
}
311318
push @{ $self->$attr }, @{ $result->$attr };
312319
}
313320

314321
# Append write concern errors without modification (they have no index)
315322
push @{ $self->write_concern_errors }, @{ $result->write_concern_errors };
316323

317-
$self->_set_op_count( $op_count + $result->op_count );
324+
$self->_set_op_count( $self->op_count + $result->op_count );
318325
$self->_set_batch_count( $self->batch_count + $result->batch_count );
319326

320327
return 1;

‎lib/MongoDB/Op/_BulkWrite.pm

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,20 +167,22 @@ my %OP_MAP = (
167167
sub _execute_write_command_batch {
168168
my ( $self, $link, $batch, $result ) = @_;
169169

170-
my ( $type, $docs ) = @$batch;
170+
my ( $type, $docs, $idx_map ) = @$batch;
171171
my ( $cmd, $op_key ) = @{ $OP_MAP{$type} };
172172

173173
my $boolean_ordered = boolean( $self->ordered );
174174
my ( $db_name, $coll_name, $wc ) =
175175
map { $self->$_ } qw/db_name coll_name write_concern/;
176176

177177
my @left_to_send = ($docs);
178+
my @sending_idx_map = ($idx_map);
178179

179180
my $max_bson_size = $link->max_bson_object_size;
180181
my $supports_document_validation = $link->supports_document_validation;
181182

182183
while (@left_to_send) {
183184
my $chunk = shift @left_to_send;
185+
my $chunk_idx_map = shift @sending_idx_map;
184186
# for update/insert, pre-encode docs as they need custom BSON handling
185187
# that can't be applied to an entire write command at once
186188
if ( $cmd eq 'update' ) {
@@ -242,6 +244,7 @@ sub _execute_write_command_batch {
242244
}
243245
else {
244246
unshift @left_to_send, $self->_split_chunk( $chunk, $error->size );
247+
unshift @sending_idx_map, $self->_split_chunk( $chunk_idx_map, $error->size );
245248
}
246249
}
247250
elsif ( $error->$_can( 'result' ) ) {
@@ -250,10 +253,11 @@ sub _execute_write_command_batch {
250253
# check for write errors, as they have a higher priority than
251254
# write concern errors.
252255
MongoDB::BulkWriteResult->_parse_cmd_result(
253-
op => $type,
256+
op => $type,
254257
op_count => scalar @$chunk,
255-
result => $error->result,
256-
cmd_doc => $cmd_doc,
258+
result => $error->result,
259+
cmd_doc => $cmd_doc,
260+
idx_map => $chunk_idx_map,
257261
)->assert_no_write_error;
258262
# Explode with original error
259263
die $error;
@@ -270,6 +274,7 @@ sub _execute_write_command_batch {
270274
op_count => scalar @$chunk,
271275
result => $cmd_result,
272276
cmd_doc => $cmd_doc,
277+
idx_map => $chunk_idx_map,
273278
);
274279

275280
# append corresponding ops to errors
@@ -308,42 +313,54 @@ sub _batch_ordered {
308313

309314
my $max_batch_count = $link->max_write_batch_size;
310315

316+
my $queue_idx = 0;
311317
for my $op (@$queue) {
312318
my ( $type, $doc ) = @$op;
313319
if ( $type ne $last_type || $count == $max_batch_count ) {
314-
push @batches, [ $type => [$doc] ];
320+
push @batches, [ $type => [$doc], [$queue_idx] ];
315321
$last_type = $type;
316322
$count = 1;
317323
}
318324
else {
319-
push @{ $batches[-1][-1] }, $doc;
325+
push @{ $batches[-1][1] }, $doc;
326+
push @{ $batches[-1][2] }, $queue_idx;
320327
$count++;
321328
}
329+
$queue_idx++;
322330
}
323331

324332
return @batches;
325333
}
326334

327335
sub _batch_unordered {
328336
my ( $self, $link, $queue ) = @_;
329-
my %batches = map { ; $_ => [ [] ] } keys %OP_MAP;
337+
my %batches = map { $_ => [ [] ] } keys %OP_MAP;
338+
my %queue_map = map { $_ => [ [] ] } keys %OP_MAP;
330339

331340
my $max_batch_count = $link->max_write_batch_size;
332341

342+
my $queue_idx = 0;
333343
for my $op (@$queue) {
334344
my ( $type, $doc ) = @$op;
335345
if ( @{ $batches{$type}[-1] } == $max_batch_count ) {
336346
push @{ $batches{$type} }, [$doc];
347+
push @{ $queue_map{$type} }, [ $queue_idx ];
337348
}
338349
else {
339350
push @{ $batches{$type}[-1] }, $doc;
351+
push @{ $queue_map{$type}[-1] }, $queue_idx;
340352
}
353+
$queue_idx++;
341354
}
342355

343356
# insert/update/delete are guaranteed to be in random order on Perl 5.18+
344357
my @batches;
345358
for my $type ( grep { scalar @{ $batches{$_}[-1] } } keys %batches ) {
346-
push @batches, map { [ $type => $_ ] } @{ $batches{$type} };
359+
push @batches, map { [
360+
$type,
361+
$batches{$type}[$_],
362+
$queue_map{$type}[$_], # array of indices from the original queue
363+
] } 0 .. $#{ $batches{$type} };
347364
}
348365
return @batches;
349366
}

‎t/bulk.t

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ subtest "unordered batch with errors" => sub {
996996
is( $details->modified_count, ( $server_does_bulk ? 0 : undef ), "modified_count" );
997997
is( $details->count_write_errors, 3, "writeError count" )
998998
or diag _truncate explain $details;
999-
cmp_deeply( $details->upserted, [ { index => 4, _id => obj_isa("BSON::OID") }, ],
999+
cmp_deeply( $details->upserted, [ { index => 2, _id => obj_isa("BSON::OID") }, ],
10001000
"upsert list" );
10011001
}
10021002
else {
@@ -1011,8 +1011,8 @@ subtest "unordered batch with errors" => sub {
10111011
cmp_deeply(
10121012
$details->upserted,
10131013
[
1014-
{ index => num(0), _id => obj_isa("BSON::OID") },
1015-
{ index => num(1), _id => obj_isa("BSON::OID") },
1014+
{ index => 1, _id => obj_isa("BSON::OID") },
1015+
{ index => 2, _id => obj_isa("BSON::OID") },
10161016
],
10171017
"upsert list"
10181018
);
@@ -1062,7 +1062,7 @@ subtest "ordered batch with errors" => sub {
10621062
cmp_deeply(
10631063
$details->write_errors->[0]{op},
10641064
{
1065-
q => methods(['FETCH','b'] => 2 ),
1065+
q => methods(['FETCH','b'] => 3 ),
10661066
u => obj_isa( $server_does_bulk ? 'BSON::Raw' : 'Tie::IxHash' ),
10671067
multi => false,
10681068
upsert => true,

‎t/crud_spec.t

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ use MongoDBTest qw/
3636
skip_unless_min_version
3737
/;
3838

39+
use MongoDBSpecTest qw/
40+
skip_unless_run_on
41+
/;
42+
3943
skip_unless_mongod();
4044

4145
plan skip_all => "Not testing with BSON wrappers"
@@ -47,7 +51,7 @@ my $server_type = server_type($conn);
4751
my $features = get_features($conn);
4852
my $coll = $testdb->get_collection('test_collection');
4953

50-
for my $dir ( map { path("t/data/CRUD/v2/$_") } qw/read write pipelines/ ) {
54+
for my $dir ( ( map { path("t/data/CRUD/v1/$_") } qw/read write/ ), path("t/data/CRUD/v2") ) {
5155
my $iterator = $dir->iterator( { recurse => 1 } );
5256
while ( my $path = $iterator->() ) {
5357
next unless -f $path && $path =~ /\.json$/;
@@ -59,6 +63,7 @@ for my $dir ( map { path("t/data/CRUD/v2/$_") } qw/read write pipelines/ ) {
5963
my $name = $path->relative($dir)->basename(".json");
6064

6165
subtest $name => sub {
66+
skip_unless_run_on($plan->{runOn}, $conn);
6267
if ( $name =~ 'arrayFilter' && ! $features->supports_arrayFilters ) {
6368
plan skip_all => "arrayFilters not supported on this mongod";
6469
}
@@ -68,15 +73,16 @@ for my $dir ( map { path("t/data/CRUD/v2/$_") } qw/read write pipelines/ ) {
6873
}
6974
for my $test ( @{ $plan->{tests} } ) {
7075
$coll->drop;
71-
$coll->insert_many( $plan->{data} );
76+
$coll->insert_many( $plan->{data} )ifexists$plan->{data};
7277
foreach my $op ( @{ $test->{'operations'} || [$test->{'operation'}] } ) {
7378
my $meth = $op->{name};
7479
my $object = $op->{'object'} || 'collection';
80+
my $outcome = exists $test->{'operations'} ? { result => $op->{result} } : $test->{outcome};
7581
local $ENV{PERL_MONGO_NO_DEP_WARNINGS} = 1 if $meth eq 'count';
7682
$meth =~ s{([A-Z])}{_\L1ドル}g;
7783
my $test_meth = "test_${meth}_${object}";
7884
my $res = main->$test_meth( $test->{description}, $meth, $op->{arguments},
79-
$test->{outcome} );
85+
$outcome );
8086
}
8187
}
8288
};
@@ -107,8 +113,19 @@ sub test_write_w_filter {
107113

108114
sub test_insert {
109115
my ( $class, $label, $method, $args, $outcome ) = @_;
116+
my $options = delete $args->{options};
110117
$args = delete $args->{document} || delete $args->{documents};
111-
my $res = $coll->$method($args);
118+
my $res;
119+
eval { $res = $coll->$method($args, $options) };
120+
if ( my $err = $@ ) {
121+
if ( $outcome->{error} ) {
122+
# we were expecting this, so we'l just unpack slightly
123+
$res = $err->result;
124+
} else {
125+
diag $err;
126+
fail 'Error encountered when not expected';
127+
}
128+
}
112129
check_insert_outcome( $label, $res, $outcome );
113130
}
114131

@@ -200,16 +217,39 @@ sub test_bulk_write_collection {
200217
my $req_method = $request->{name};
201218
my $arg = $request->{arguments};
202219
$req_method =~ s{([A-Z])}{_\L1ドル}g;
203-
my $filter = delete $arg->{filter};
204-
my $update = delete $arg->{update};
205-
my $arr_filters = delete $arg->{arrayFilters};
206-
my $bulk_view = $bulk->find( $filter );
207-
if ( scalar( @$arr_filters ) ) {
208-
$bulk_view = $bulk_view->arrayFilters( $arr_filters );
220+
221+
# insert_one is slightly different
222+
if ( $req_method eq 'insert_one' ) {
223+
$bulk->insert_one( $arg->{document} );
224+
}
225+
else {
226+
my $filter = delete $arg->{filter};
227+
my $method_arg = exists $arg->{update}
228+
? delete $arg->{update}
229+
: exists $arg->{replacement}
230+
? delete $arg->{replacement}
231+
: undef;
232+
my $arr_filters = delete $arg->{arrayFilters};
233+
my $bulk_view = $bulk->find( $filter || {} );
234+
if ( defined $arr_filters && scalar( @$arr_filters ) ) {
235+
$bulk_view = $bulk_view->arrayFilters( $arr_filters );
236+
}
237+
if ( $arg->{upsert} ) { $bulk_view = $bulk_view->upsert() }
238+
if ( $arg->{collation} ) { $bulk_view = $bulk_view->collation( $arg->{collation} ) }
239+
$bulk_view->$req_method( $method_arg );
240+
}
241+
}
242+
my $res;
243+
eval { $res = $bulk->execute };
244+
if ( my $err = $@ ) {
245+
if ( $outcome->{error} ) {
246+
# we were expecting this, so we'l just unpack slightly
247+
$res = $err->result;
248+
} else {
249+
diag $err;
250+
fail 'Error encountered when not expected';
209251
}
210-
$bulk_view->$req_method( $update );
211252
}
212-
my $res = $bulk->execute;
213253

214254
check_write_outcome( $label, $res, $outcome );
215255
}
@@ -238,28 +278,16 @@ sub test_aggregate_collection {
238278
sub test_aggregate_database {
239279
my ( $class, $label, $method, $args, $outcome ) = @_;
240280

241-
skip_unless_min_version($conn, 'v3.6.0');
242-
243-
plan skip_all => "mongos mangles commands too much vs test expectations"
244-
if $server_type eq 'Mongos';
281+
# TODO For some reason this test doesnt work on single mongod? see v2/db-aggregate spec test
282+
plan skip_all => "Single server doesnt give dummy output"
283+
if $server_type eq 'Standalone';
245284

246285
my $pipeline = delete $args->{pipeline};
286+
247287
my $res = $conn->get_database('admin')->aggregate($pipeline, $args);
288+
248289
is($res->{'_full_name'}, 'admin.$cmd.aggregate', 'check DB aggregate full name');
249-
my $got = [ $res->all ]->[0]{'command'};
250-
my $result = $outcome->{'result'}[0]{'command'};
251-
$result->{'cursor'} = ignore();
252-
$result->{'pipeline'}[0]{'$currentOp'} = noclass(
253-
superhashof($result->{'pipeline'}[0]{'$currentOp'})
254-
);
255-
$result->{'pipeline'}[2]{'$project'} = ignore();
256-
$result->{'pipeline'}[3]{'$project'} = ignore();
257-
258-
cmp_deeply(
259-
$got,
260-
noclass( superhashof($result) ),
261-
"$label: compare",
262-
) or diag explain $got;
290+
is_deeply( [ $res->all ], $outcome->{result}, "$label: compare" )
263291
}
264292

265293
sub test_distinct_collection {
@@ -328,7 +356,9 @@ sub check_insert_outcome {
328356
return check_write_outcome( $label, $res, $outcome );
329357
}
330358

331-
cmp_deeply( $res->inserted_ids , $outcome->{result}{insertedIds}, "$label: result doc" );
359+
if ( exists $outcome->{result}{insertedIds} ) {
360+
cmp_deeply( $res->inserted_ids , $outcome->{result}{insertedIds}, "$label: result doc" );
361+
}
332362
check_collection( $label, $outcome );
333363
}
334364

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /