This is my declarative model:
import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
created_date = DateTime(default=datetime.datetime.utcnow)
However, when I try to import this module, I get this error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "orm/models2.py", line 37, in <module>
class Test(Base):
File "orm/models2.py", line 41, in Test
created_date = sqlalchemy.DateTime(default=datetime.datetime.utcnow)
TypeError: __init__() got an unexpected keyword argument 'default'
If I use an Integer type, I can set a default value. What's going on?
-
2This shouldn't be used. utcnow is a naive timestamp in UTC timezone, however, chances are that naive timestamps are interpreted in local timezone instead.Antti Haapala– Antti Haapala2016年10月06日 07:33:08 +00:00Commented Oct 6, 2016 at 7:33
-
3I know this question was asked a long time ago but I think your answer should be changed to the one that @Jeff Widman provided as the other will use the "compile" time datetime when the table class is defined versus when the record is created. If you are AFK then at least this comment will provide a warning for others to carefully check the comments for each question.David– David2019年02月21日 21:12:05 +00:00Commented Feb 21, 2019 at 21:12
14 Answers 14
Calculate timestamps within your DB, not your client
For sanity, you probably want to have all datetimes calculated by your DB server, rather than the application server. Calculating the timestamp in the application can lead to problems because network latency is variable, clients experience slightly different clock drift, and different programming languages occasionally calculate time slightly differently.
SQLAlchemy allows you to do this by passing func.now() or func.current_timestamp() (they are aliases of each other) which tells the DB to calculate the timestamp itself.
Use SQLALchemy's server_default
Additionally, for a default where you're already telling the DB to calculate the value, it's generally better to use server_default instead of default. This tells SQLAlchemy to pass the default value as part of the CREATE TABLE statement.
For example, if you write an ad hoc script against this table, using server_default means you won't need to worry about manually adding a timestamp call to your script--the database will set it automatically.
Understanding SQLAlchemy's onupdate/server_onupdate
SQLAlchemy also supports onupdate so that anytime the row is updated it inserts a new timestamp. Again, best to tell the DB to calculate the timestamp itself:
from sqlalchemy.sql import func
time_created = Column(DateTime(timezone=True), server_default=func.now())
time_updated = Column(DateTime(timezone=True), onupdate=func.now())
There is a server_onupdate parameter, but unlike server_default, it doesn't actually set anything serverside. It just tells SQLalchemy that your database will change the column when an update happens (perhaps you created a trigger on the column ), so SQLAlchemy will ask for the return value so it can update the corresponding object.
One other potential gotcha:
You might be surprised to notice that if you make a bunch of changes within a single transaction, they all have the same timestamp. That's because the SQL standard specifies that CURRENT_TIMESTAMP returns values based on the start of the transaction.
PostgreSQL provides the non-SQL-standard statement_timestamp() and clock_timestamp() which do change within a transaction. Docs here: https://www.postgresql.org/docs/current/static/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
UTC timestamp
If you want to use UTC timestamps, a stub of implementation for func.utcnow() is provided in SQLAlchemy documentation. You need to provide appropriate driver-specific functions on your own though.
21 Comments
server_default=func.now() worked for me, but onupdate=func.now() didn't. Tried onupdate=datetime.datetime.utcnow (without brackets), that also didnt helpcreated_at = db.Column(db.DateTime, server_default=UtcNow()) and updated_at = db.Column(db.DateTime, server_default=UtcNow(), onupdate=UtcNow()) where UtcNow is a class as class UtcNow(expression.FunctionElement): type = DateTime() and we have @compiles(UtcNow, 'postgresql') def pg_utc_now(element, compiler, **kw): return "TIMEZONE('utc', CURRENT_TIMESTAMP)"DateTime doesn't have a default key as an input. The default key should be an input to the Column function. Try this:
import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
created_date = Column(DateTime, default=datetime.datetime.utcnow)
9 Comments
datetime.datetime.utcnow seems called as callback.default=datetime.datetime.utcnow(); you want to pass the utcnow function, not the result of evaluating it at module load.from sqlalchemy.sql import func; created_date = Column(DateTime(timezone=True), server_default=func.now()server_default will result in a default value stored at the SQL table definition level (you can see it if you do \d <your_table> in psql for instance), and which would be taken into account while using a raw insert query, outside the ORM. On the other hand, default will only be working at the ORM level: if you create a model object and save it, it will contain the default value (because the ORM has explicitly set it), while a raw insert query would result in a record without the default value.You can also use sqlalchemy builtin function for default DateTime
from sqlalchemy.sql import func
DT = Column(DateTime(timezone=True), default=func.now())
5 Comments
server_default instead of default so value will by handled by database itself.timezone=True do?Using the default parameter with datetime.now:
from sqlalchemy import Column, Integer, DateTime
from datetime import datetime
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
2 Comments
You likely want to use onupdate=datetime.now so that UPDATEs also change the last_updated field.
SQLAlchemy has two defaults for python executed functions.
defaultsets the value on INSERT, only onceonupdatesets the value to the callable result on UPDATE as well.
5 Comments
onupdate doesn't do anything for me.created_at = db.Column(db.DateTime, server_default=UtcNow()) and updated_at = db.Column(db.DateTime, server_default=UtcNow(), onupdate=UtcNow()) where UtcNow is a class as class UtcNow(expression.FunctionElement): type = DateTime() and we have @compiles(UtcNow, 'postgresql') def pg_utc_now(element, compiler, **kw): return "TIMEZONE('utc', CURRENT_TIMESTAMP)"sqlalchmey.sql.func.now() is preferred, though won't be too different. It's slightly better to use DB time instead of server time if the system is distributed & many different users may make changes around the same timeThe default keyword parameter should be given to the Column object.
Example:
Column(u'timestamp', TIMESTAMP(timezone=True), primary_key=False, nullable=False, default=time_now),
The default value can be a callable, which here I defined like the following.
from pytz import timezone
from datetime import datetime
UTC = timezone('UTC')
def time_now():
return datetime.now(UTC)
For mariadb thats worked for me:
from sqlalchemy import Column, Integer, String, DateTime, TIMESTAMP, text
from sqlalchemy.sql import func
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Test(Base):
__tablename__ = "test"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), nullable=False)
email = Column(String(255), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
updated_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
In the sqlalchemy documentation for mariadb, it is recommended to import the textfrom sqlalchemy itself and set the server_default with the text, inserting the custom command.
updated_at=Column(DateTime, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
To understand func.now you can read the sql alchemy documentation.
Hope I helped in some way.
Comments
TL;DR - version controlled database triggers to automatically update updated_at field, and make the ORM aware and issue a RETURNING statement.
WHY? - because we want to avoid relying purely on ORM's onupdate to make suer update_at field is up-to-date.
NOTE: this code was designed for PostgreSQL, but would be similar for other databases.
To extend on this answer, for a default value you should use server_default as suggested. However, for having a value that will be updated upon an UPDATE statement, you have 2 options. First is described in the linked answer. The second option is using database triggers. I found alembic-utils useful to make sure triggered is migrated and updated when the version-controlled is modified. Combining it with a Base model and table listing allowed adding those created/updated fields to all ORM models.
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import func, FetchedValue
class Base(DeclarativeBase):
__abstract__ = True
metadata = MetaData(naming_convention=convention)
id: Mapped[int] = mapped_column(Identity(), primary_key=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=FetchedValue(), server_onupdate=FetchedValue()
)
# IMPORTANT! see details below
__mapper_args__ = {"eager_defaults": True}
class Customer(Base):
__tablename__ = "customers"
name: Mapped[str] = mapped_column(nullable=False)
triggers.py:
from alembic_utils.pg_function import PGFunction
from alembic_utils.pg_trigger import PGTrigger
from alembic_utils.replaceable_entity import register_entities
# import your `Base` module - ONLY AFTER all other modules were imported as well
# that's anyway required for handling the metadata in `alembic`.
from ... import Base
update_update_time_func = PGFunction(
schema="public",
signature="update_updated_at()",
definition="""
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC');
RETURN NEW;
END;
$$ LANGUAGE plpgsql
""",
)
register_entities([update_update_time_func])
update_time_triggers = [
PGTrigger(
schema="public",
signature=f"update_{tb_name}_updated_at",
on_entity=tb_name,
definition=f"""
BEFORE INSERT OR UPDATE ON {tb_name}
FOR EACH ROW EXECUTE FUNCTION update_updated_at()
""",
)
for tb_name, tb in Base.metadata.tables.items()
if any(c.name == "updated_at" for c in tb.columns)
]
register_entities(update_time_triggers)
In migrations/env.py import your triggers module.
It will register the trigger entities and the stored procedure.
running alembic revision --autogenerate -m "add triggers" should identify the function and triggers, and alembic upgrade head should add the function and triggers.
Eager defaults and performance: implementing this solution, it was important for me both:
- After an INSERT or UPDATE operation (done via
ORM), the instance at hand will be updated by the database-generated values (created and updated timestamps, in this case). - The operation will not trigger a SELECT, but instead use RETURNING where appropriate.
To demonstrate the above consider the following code:
with Session() as session:
c = Customer(name='foo')
session.add(c)
print(f'Instance before INSERT: {c}')
print('INSERT logs:')
session.commit()
print(f'Instance after INSERT: {c}')
c.name = 'bar'
print('UPDATE logs:')
session.commit()
print(f'Instance after UPDATE: {c}')
Which produces these logs (slightly edited for readability):
Instance before INSERT:
<Customer(name='foo')>
INSERT logs:
BEGIN (implicit)
INSERT INTO customers (name) VALUES (%(name)s) RETURNING customers.id, customers.created_at, customers.updated_at
[cached since 1.849e+04s ago] {'name': 'foo'}
COMMIT
Instance after INSERT:
<Customer(name='foo', id=8, created_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 964627), updated_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 964627))>
UPDATE logs:
BEGIN (implicit)
UPDATE customers SET name=%(name)s WHERE customers.id = %(customers_id)s RETURNING customers.updated_at
[cached since 220s ago] {'name': 'bar', 'customers_id': 8}
COMMIT
Instance after UPDATE:
<Customer(name='bar', id=8, created_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 964627), updated_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 970578))>
Comments
As per PostgreSQL documentation:
now, CURRENT_TIMESTAMP, LOCALTIMESTAMP return the time of transaction
This is considered a feature: the intent is to allow a single transaction to have a consistent notion of the "current" time, so that multiple modifications within the same transaction bear the same time stamp.
You might want to use statement_timestamp or clock_timestamp if you don't want transaction timestamp.
statement_timestamp()
returns the start time of the current statement (more specifically, the time of receipt of the latest command message from the client). statement_timestamp
clock_timestamp()
returns the actual current time, and therefore its value changes even within a single SQL command.
Comments
Jeff Widman said on his answer that you need to create your own implementation of UTC timestamps for func.utcnow()
As I didnt want to implement it myself, I have searched for and found a python package which already does the job and is maintained by many people.
The package name is spoqa/sqlalchemy-ut.
A summary of what the package does is: Long story short, UtcDateTime does:
take only aware datetime.datetime,
return only aware datetime.datetime,
never take or return naive datetime.datetime,
ensure timestamps in database always to be encoded in UTC, and
work as you’d expect.
Comments
Note that for server_default=func.now() and func.now() to work :
Local_modified = Column(DateTime, server_default=func.now(), onupdate=func.now())
you need to set DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP in your table DDL.
For example
create table test
(
id int auto_increment
primary key,
source varchar(50) null,
Local_modified datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
collate=utf8mb4_bin;
Otherwise, server_default=func.now(), onupdate=func.now() makes no effects.
1 Comment
None of the posted answers result in having a database-maintained updated_at.
This is slightly complex to do in PostgreSQL (vs. MySQL) because you must maintain the ON UPDATE trigger.
The below code works when using either sqlalchemy's create_all or alembic's --autogenerate. It supports upgrade and downgrade migration actions, but does not handle other situations such as renaming a table or deleting a table.
(You won't see the triggers in the migration generated by alembic, but the on_* events are executed during execution of the migration.)
Example usage is below:
my_db_lib/models.py
from sqlalchemy.orm import Mapped, mapped_column
from my_db_lib.timestamps import BaseWithStamps
class Base(BaseWithStamps):
pass
class MyTable(Base):
__tablename__ = 'my_table'
id: Mapped[int] = mapped_column(primary_key=True)
# other table columns
# BaseWithStamps includes `created_at` and `updated_at` by default
Following are the dependencies for provisioning the above example:
my_db_lib/alembic.ini
[alembic]
script_location = %(here)s/alembic
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
prepend_sys_path = .
timezone = UTC
revision_environment = true
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
sqlalchemy.url = driver://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
my_db_lib/alembic/env.py
from alembic import context
from logging.config import fileConfig
from my_db_lib.helpers import get_schema
from my_db_lib.models import Base
from my_db_lib.timestamps import update_stamp_function
from sqlalchemy import engine_from_config
from sqlalchemy import pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# this is the script EnvironmentContext
script = context.script
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# the DeclarativeBase to target
Target = Base
##
# Runners
##
_configure_kwargs = {
'target_metadata': Target.metadata,
'version_table_schema': get_schema(Target),
'compare_server_default': True,
'compare_type': True,
}
def run_migrations_offline() -> None:
'''
Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
'''
url = config.get_main_option('sqlalchemy.url')
context.configure(
url=url,
literal_binds=True,
dialect_opts={'paramstyle': 'named'},
**_configure_kwargs
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
'''
Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
'''
engine = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with engine.connect() as connection:
context.configure(
connection=connection,
**_configure_kwargs
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
my_db_lib/helpers.py
from sqlalchemy.orm import DeclarativeBase
from typing import Type
def has_table(BaseClass: Type[DeclarativeBase]):
# https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/orm/decl_base.py#L1821-L1830
for attr in ('__tablename__', '__table__'):
try:
attr_val = getattr(BaseClass, attr)
except AttributeError:
pass
else:
if attr_val:
return True
return False
def get_tablename(BaseClass: Type[DeclarativeBase]):
return BaseClass.__tablename__ if BaseClass.__tablename__ else BaseClass.__table__.name
def get_schema(BaseClass: Type[DeclarativeBase], default_schema='public'):
try:
table_args = BaseClass.__table_args__
except AttributeError:
pass
else:
schema: str = table_args.get('schema')
if schema:
# schema is defined for this single table
return schema
try:
metadata = BaseClass.metadata
except AttributeError:
pass
else:
try:
schema = metadata.schema
except AttributeError:
pass
else:
if schema:
# schema is defined as default for all tables
return schema
return default_schema
my_db_lib/events.py
from functools import partial
from sqlalchemy import DDL, Table
from sqlalchemy.engine import Connectable
from sqlalchemy.event import listen
from sqlalchemy.orm import DeclarativeBase
from typing import Type
def on_after_create(*args, **kwargs):
return _on_event('after_create', *args, **kwargs)
def on_before_drop(*args, **kwargs):
return _on_event('before_drop', *args, **kwargs)
# https://stackoverflow.com/a/34029220
def _on_event(
event: str,
BaseClass: Type[DeclarativeBase],
ddl: DDL,
):
def listener(
tablename: str,
ddl: DDL,
table: Table,
bind: Connectable,
**kwargs
):
if table.name == tablename:
ddl(table, bind, **kwargs)
listen(
Table,
event,
partial(listener, BaseClass.__tablename__, ddl)
)
my_db_lib/timestamps.py
from alembic_utils.pg_function import PGFunction
from alembic_utils.pg_trigger import PGTrigger
from datetime import datetime
from sqlalchemy import DDL, types
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.schema import FetchedValue
from sqlalchemy.sql import expression
from my_db_lib.events import on_after_create, on_before_drop
from my_db_lib.helpers import get_schema, get_tablename, has_table
from typing import Type
class UtcNow(expression.FunctionElement):
# https://stackoverflow.com/q/13370317#comment90734051_31484933
type = types.DateTime()
@compiles(UtcNow, 'postgresql')
def pg_utc_now(element, compiler, **kw):
return "TIMEZONE('utc', CURRENT_TIMESTAMP)"
def make_stamp_function(BaseClass: Type[DeclarativeBase]):
schema = get_schema(BaseClass)
return PGFunction(
schema=schema,
signature='update_stamp()',
definition=('''
RETURNS
TRIGGER
AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ LANGUAGE 'plpgsql'
;
''')
)
def make_stamp_trigger(BaseClass: Type[DeclarativeBase]):
schema = get_schema(BaseClass)
table = get_tablename(BaseClass)
schema_table = f'"{schema}"."{table}"'
return PGTrigger(
schema=schema,
signature=f'{table}_updated_at',
on_entity=schema_table,
definition=(f'''
BEFORE
UPDATE
ON
{schema_table}
FOR EACH
ROW
EXECUTE
PROCEDURE "{schema}".update_stamp()
;
'''),
)
PGObject = PGFunction | PGTrigger
def to_sql_create(pg_obj: PGObject):
return '\n'.join(
str(s) for s in
pg_obj.to_sql_statement_create_or_replace()
).strip()
def to_sql_drop(pg_obj: PGObject):
return str(pg_obj.to_sql_statement_drop()).strip()
class BaseWithStamps(DeclarativeBase):
def __init_subclass__(cls, *args, **kwargs):
if has_table(cls):
cls.created_at = cls._init_created_at()
cls.updated_at = cls._init_updated_at()
return super().__init_subclass__(*args, **kwargs)
@staticmethod
def _init_created_at() -> Mapped[datetime]:
return mapped_column(
types.TIMESTAMP(timezone=True),
server_default=UtcNow()
)
@classmethod
def _init_updated_at(cls) -> Mapped[datetime]:
stamp_func = make_stamp_function(cls)
stamp_trig = make_stamp_trigger(cls)
on_after_create(cls, DDL(to_sql_create(stamp_func)))
on_after_create(cls, DDL(to_sql_create(stamp_trig)))
on_before_drop(cls, DDL(to_sql_drop(stamp_trig)))
return mapped_column(
types.DateTime(timezone=True),
server_default=UtcNow(),
server_onupdate=FetchedValue()
)
Comments
You can use TIMESTAMP with sqlalchemy.
from sqlalchemy import TIMESTAMP, Table, MetaData, Column, ...
... ellipsis ...
def function_name(self) -> Table:
return Table(
"table_name",
self._metadata,
...,
Column("date_time", TIMESTAMP),
)
... ellipsis ...
Comments
It's better to use like this way,,
from sqlalchemy import DateTime
from datetime import datetime # Import datetime
created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, onupdate=datetime.datetime.now())
Sorry for the Previous version. Now it's perfectly works for me...
created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, default=datetime.datetime.now(), onupdate=datetime.datetime.now())
Here is my full user model
import uuid
from app.configs.database import Base
from sqlalchemy import Column, String, Integer, DateTime, Enum
from app.utils.utils import ERoles
import datetime
class User(Base):
__tablename__ = "Users"
id = Column(Integer, primary_key=True, index=True)
uuid = Column(String(36), unique=True,
default=str(uuid.uuid4()))
name = Column(String(256), nullable=True)
username = Column(String(256), unique=True)
email = Column(String(256), unique=True)
password = Column(String(256))
role = Column(Enum(ERoles), default=ERoles.USER)
created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, default=datetime.datetime.now(),
onupdate=datetime.datetime.now())