Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Most performant method to update many records? #801

Answered by elprans
StayFoolisj asked this question in Q&A
Discussion options

I use Postgres 12 and a few times a day I need to update 30-80k rows and ideally in less than a minute.

Using SQLAlchemy / psycopg2 bulk_update_mappings is not really performant, its taking +10minutes even with chunking the data and using multithreading. So I started looking for options, which led me to find asyncpg.

Account model:

class Account(Base):
 __tablename__ = 'accounts'
 address = Column(String, primary_key=True)
 protocols_used = Column(ARRAY(String))

A snippet of what I'm doing now (minus chunking & multithreading for readability). My desired outcome is a significant speed improvement on this. 1k row updates per 1s would be ideal. I'm not sure what is realistic.

users_to_update_data = [acc for acc in all_accounts_data if acc['address'] in users_to_update] 
updates = []
for account_data in users_to_update_data:
 updates.append(
 {
 "address": account_data['address'],
 "protocols_used": account_data['protocols_used'] + [protocol]
 }
 )
session.bulk_update_mappings(Account, entries_to_update)

From what I understand, this could be a job for executemany() ? I can prepare a list of either dicts or tuples (I believe tuples are more efficiently iterated due to their immutability?). I failed to find any code examples in the docs or elsewhere. Would be very grateful if someone could point me in the right direction with an example or suggestions. Thanks!

You must be logged in to vote

Indeed, use executemany:

updates = [(account_id, new_address, additional_protocol) from <data_source>]
await connection.executemany(
 """
 UPDATE accounts
 SET 
 address = 2,ドル
 protocols_used = array_append(protocols_used, 3ドル)
 WHERE
 id = 1ドル
 """,
 updates,
)

Replies: 2 comments 5 replies

Comment options

Indeed, use executemany:

updates = [(account_id, new_address, additional_protocol) from <data_source>]
await connection.executemany(
 """
 UPDATE accounts
 SET 
 address = 2,ドル
 protocols_used = array_append(protocols_used, 3ドル)
 WHERE
 id = 1ドル
 """,
 updates,
)
You must be logged in to vote
1 reply
Comment options

Awesome!

Answer selected by StayFoolisj
Comment options

Thank you so much!

I refactored my script to using your suggestion + using sets instead of lists and total duration for my script now takes 21s instead of 20minutes! That involves fetching 500k records and updating 50k of them.

You must be logged in to vote
4 replies
Comment options

Did you use asyncpg to fetch records as well?

Comment options

Yes but its a bit slower than the SQLAlchemy equivalent. Maybe because of the connection management?

async def get_all_accounts_asyncpg():
 """
 Returns all accounts in the database
 """
 connection = await asyncpg.connect(**db_params)
 query = await connection.fetch('SELECT * FROM accounts')
 await connection.close()
 return [dict(r) for r in query]
def get_all_accounts_sqlalchemy():
 """
 Returns all accounts in the database
 """
 query = session.query(Account).all()
 return [r.address for r in query]
Comment options

You don't need to cast asyncpg.Record to dict, it's already dict-like. Remove the cast and see what happens.

Comment options

You don't need to cast asyncpg.Record to dict, it's already dict-like. Remove the cast and see what happens.

Good part of the day
I would like to construct data structure similar to asyncpg.Record for my project, is it possible with only pure paython ?
thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

AltStyle によって変換されたページ (->オリジナル) /