-
Notifications
You must be signed in to change notification settings - Fork 431
Most performant method to update many records? #801
-
I use Postgres 12 and a few times a day I need to update 30-80k rows and ideally in less than a minute.
Using SQLAlchemy / psycopg2 bulk_update_mappings
is not really performant, its taking +10minutes even with chunking the data and using multithreading. So I started looking for options, which led me to find asyncpg.
Account model:
class Account(Base):
__tablename__ = 'accounts'
address = Column(String, primary_key=True)
protocols_used = Column(ARRAY(String))
A snippet of what I'm doing now (minus chunking & multithreading for readability). My desired outcome is a significant speed improvement on this. 1k row updates per 1s would be ideal. I'm not sure what is realistic.
users_to_update_data = [acc for acc in all_accounts_data if acc['address'] in users_to_update]
updates = []
for account_data in users_to_update_data:
updates.append(
{
"address": account_data['address'],
"protocols_used": account_data['protocols_used'] + [protocol]
}
)
session.bulk_update_mappings(Account, entries_to_update)
From what I understand, this could be a job for executemany()
? I can prepare a list of either dicts or tuples (I believe tuples are more efficiently iterated due to their immutability?). I failed to find any code examples in the docs or elsewhere. Would be very grateful if someone could point me in the right direction with an example or suggestions. Thanks!
Beta Was this translation helpful? Give feedback.
All reactions
Indeed, use executemany
:
updates = [(account_id, new_address, additional_protocol) from <data_source>] await connection.executemany( """ UPDATE accounts SET address = 2,ドル protocols_used = array_append(protocols_used, 3ドル) WHERE id = 1ドル """, updates, )
Replies: 2 comments 5 replies
-
Indeed, use executemany
:
updates = [(account_id, new_address, additional_protocol) from <data_source>] await connection.executemany( """ UPDATE accounts SET address = 2,ドル protocols_used = array_append(protocols_used, 3ドル) WHERE id = 1ドル """, updates, )
Beta Was this translation helpful? Give feedback.
All reactions
-
❤️ 1
-
Awesome!
Beta Was this translation helpful? Give feedback.
All reactions
-
Thank you so much!
I refactored my script to using your suggestion + using sets instead of lists and total duration for my script now takes 21s instead of 20minutes! That involves fetching 500k records and updating 50k of them.
Beta Was this translation helpful? Give feedback.
All reactions
-
Did you use asyncpg
to fetch records as well?
Beta Was this translation helpful? Give feedback.
All reactions
-
Yes but its a bit slower than the SQLAlchemy equivalent. Maybe because of the connection management?
async def get_all_accounts_asyncpg():
"""
Returns all accounts in the database
"""
connection = await asyncpg.connect(**db_params)
query = await connection.fetch('SELECT * FROM accounts')
await connection.close()
return [dict(r) for r in query]
def get_all_accounts_sqlalchemy():
"""
Returns all accounts in the database
"""
query = session.query(Account).all()
return [r.address for r in query]
Beta Was this translation helpful? Give feedback.
All reactions
-
You don't need to cast asyncpg.Record
to dict
, it's already dict
-like. Remove the cast and see what happens.
Beta Was this translation helpful? Give feedback.
All reactions
-
You don't need to cast
asyncpg.Record
todict
, it's alreadydict
-like. Remove the cast and see what happens.
Good part of the day
I would like to construct data structure similar to asyncpg.Record
for my project, is it possible with only pure paython ?
thanks
Beta Was this translation helpful? Give feedback.
All reactions
-
👀 1