1+ using DtmCommon ;
2+ using System . Threading . Tasks ;
3+ using Microsoft . Extensions . Logging ;
4+ using System ;
5+ using MongoDB . Driver ;
6+ using System . Linq ;
7+ using System . Collections . Generic ;
8+ 9+ namespace DtmMongoBarrier
10+ {
11+ public static class MongoBranchBarrier
12+ {
13+ public static async Task MongoCall ( this BranchBarrier bb , IMongoClient mc , Func < IClientSessionHandle , Task > busiCall )
14+ {
15+ bb . BarrierID = bb . BarrierID + 1 ;
16+ var bid = bb . BarrierID . ToString ( ) . PadLeft ( 2 , '0' ) ;
17+ 18+ var session = await mc . StartSessionAsync ( ) ;
19+ 20+ session . StartTransaction ( ) ;
21+ 22+ try
23+ {
24+ var originOp = Constant . Barrier . OpDict . TryGetValue ( bb . Op , out var ot ) ? ot : string . Empty ;
25+ 26+ var ( originAffected , oErr ) = await MongoInsertBarrier ( bb , session , bb . BranchID , originOp , bid , bb . Op ) ;
27+ var ( currentAffected , rErr ) = await MongoInsertBarrier ( bb , session , bb . BranchID , bb . Op , bid , bb . Op ) ;
28+ 29+ bb ? . Logger ? . LogDebug ( "mongo originAffected: {originAffected} currentAffected: {currentAffected}" , originAffected , currentAffected ) ;
30+ 31+ if ( bb . IsMsgRejected ( rErr , bb . Op , currentAffected ) )
32+ throw new DtmDuplicatedException ( ) ;
33+ 34+ var isNullCompensation = bb . IsNullCompensation ( bb . Op , originAffected ) ;
35+ var isDuplicateOrPend = bb . IsDuplicateOrPend ( currentAffected ) ;
36+ 37+ if ( isNullCompensation || isDuplicateOrPend )
38+ {
39+ bb ? . Logger ? . LogInformation ( "mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}" , isNullCompensation , isDuplicateOrPend ) ;
40+ await session . CommitTransactionAsync ( ) ;
41+ return ;
42+ }
43+ 44+ try
45+ {
46+ await busiCall . Invoke ( session ) ;
47+ }
48+ catch
49+ {
50+ throw ;
51+ }
52+ 53+ await session . CommitTransactionAsync ( ) ;
54+ }
55+ catch ( Exception ex )
56+ {
57+ bb ? . Logger ? . LogError ( ex , "Mongo Call error, gid={gid}, trans_type={trans_type}" , bb . Gid , bb . TransType ) ;
58+ 59+ await session . AbortTransactionAsync ( ) ;
60+ 61+ throw ;
62+ }
63+ }
64+ 65+ public static async Task < string > MongoQueryPrepared ( this BranchBarrier bb , IMongoClient mc )
66+ {
67+ var session = await mc . StartSessionAsync ( ) ;
68+ 69+ try
70+ {
71+ await MongoInsertBarrier (
72+ bb ,
73+ session ,
74+ Constant . Barrier . MSG_BRANCHID ,
75+ Constant . TYPE_MSG ,
76+ Constant . Barrier . MSG_BARRIER_ID ,
77+ Constant . Barrier . MSG_BARRIER_REASON ) ;
78+ }
79+ catch ( Exception ex )
80+ {
81+ bb ? . Logger ? . LogWarning ( ex , "Mongo Insert Barrier error, gid={gid}" , bb . Gid ) ;
82+ return ex . Message ;
83+ }
84+ 85+ var reason = string . Empty ;
86+ 87+ try
88+ {
89+ var fs = bb . DtmOptions . BarrierTableName . Split ( '.' ) ;
90+ var barrier = session . Client . GetDatabase ( fs [ 0 ] ) . GetCollection < DtmBarrierDocument > ( fs [ 1 ] ) ;
91+ 92+ var filter = BuildFilters ( bb . Gid , Constant . Barrier . MSG_BRANCHID , Constant . TYPE_MSG , Constant . Barrier . MSG_BARRIER_ID ) ;
93+ var cursor = await barrier . FindAsync < DtmBarrierDocument > ( filter ) ;
94+ var res = await cursor . ToListAsync ( ) ;
95+ 96+ if ( res != null && res . Any ( ) )
97+ {
98+ reason = res . First ( ) . Reason ;
99+ if ( reason . Equals ( Constant . Barrier . MSG_BARRIER_REASON ) ) return Constant . ResultFailure ;
100+ }
101+ }
102+ catch ( Exception ex )
103+ {
104+ bb ? . Logger ? . LogWarning ( ex , "Mongo Query Prepared error, gid={gid}" , bb . Gid ) ;
105+ return ex . Message ;
106+ }
107+ 108+ return string . Empty ;
109+ }
110+ 111+ private static async Task < ( int , string ) > MongoInsertBarrier ( BranchBarrier bb , IClientSessionHandle session , string branchId , string op , string bid , string reason )
112+ {
113+ var err = string . Empty ;
114+ if ( session == null ) return ( - 1 , err ) ;
115+ if ( string . IsNullOrWhiteSpace ( op ) ) return ( 0 , err ) ;
116+ 117+ var fs = bb . DtmOptions . BarrierTableName . Split ( '.' ) ;
118+ var barrier = session . Client . GetDatabase ( fs [ 0 ] ) . GetCollection < DtmBarrierDocument > ( fs [ 1 ] ) ;
119+ 120+ List < DtmBarrierDocument > res = null ;
121+ 122+ try
123+ {
124+ var filter = BuildFilters ( bb . Gid , branchId , op , bid ) ;
125+ var cursor = await barrier . FindAsync < DtmBarrierDocument > ( filter ) ;
126+ res = await cursor . ToListAsync ( ) ;
127+ }
128+ catch ( Exception ex )
129+ {
130+ err = ex . Message ;
131+ bb ? . Logger ? . LogDebug ( ex , "Find document exception here, gid={gid}, branchId={branchId}, op={op}, bid={bid}" , bb . Gid , branchId , op , bid ) ;
132+ }
133+ 134+ if ( res == null || res . Count <= 0 )
135+ {
136+ try
137+ {
138+ await barrier . InsertOneAsync ( new DtmBarrierDocument
139+ {
140+ TransType = bb . TransType ,
141+ GId = bb . Gid ,
142+ BranchId = bb . BranchID ,
143+ Op = op ,
144+ BarrierId = bid ,
145+ Reason = reason
146+ } ) ;
147+ }
148+ catch ( Exception ex )
149+ {
150+ err = ex . Message ;
151+ }
152+ 153+ return ( 1 , err ) ;
154+ }
155+ 156+ return ( 0 , err ) ;
157+ }
158+ 159+ private static FilterDefinition < DtmBarrierDocument > BuildFilters ( string gid , string branchId , string op , string barrierId )
160+ {
161+ return new FilterDefinitionBuilder < DtmBarrierDocument > ( ) . And (
162+ Builders < DtmBarrierDocument > . Filter . Eq ( x => x . GId , gid ) ,
163+ Builders < DtmBarrierDocument > . Filter . Eq ( x => x . BranchId , branchId ) ,
164+ Builders < DtmBarrierDocument > . Filter . Eq ( x => x . Op , op ) ,
165+ Builders < DtmBarrierDocument > . Filter . Eq ( x => x . BarrierId , barrierId )
166+ ) ;
167+ }
168+ }
169+ }
0 commit comments