Expand Up
@@ -36,6 +36,7 @@
from sortedcontainers import SortedList
import pyiceberg.expressions.parser as parser
from pyiceberg.exceptions import CommitFailedException
from pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
Expand Down
Expand Up
@@ -104,6 +105,7 @@
TableRequirement,
TableUpdate,
UpdatesAndRequirements,
UpdateTableMetadata,
UpgradeFormatVersionUpdate,
update_table_metadata,
)
Expand Down
Expand Up
@@ -132,7 +134,8 @@
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.properties import property_as_bool
from pyiceberg.utils.properties import property_as_bool, property_as_int
from pyiceberg.utils.retry import RetryConfig, run_with_retry
if TYPE_CHECKING:
import bodo.pandas as bd
Expand Down
Expand Up
@@ -243,12 +246,58 @@ class TableProperties:
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
COMMIT_NUM_RETRIES = "commit.retry.num-retries"
COMMIT_NUM_RETRIES_DEFAULT = 4
COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100
COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute
COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 30 * 60 * 1000 # 30 minutes
COMMIT_NUM_STATUS_CHECKS = "commit.status-check.num-retries"
COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3
COMMIT_STATUS_CHECKS_MIN_WAIT_MS = "commit.status-check.min-wait-ms"
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT = 1000 # 1 second
COMMIT_STATUS_CHECKS_MAX_WAIT_MS = "commit.status-check.max-wait-ms"
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS = "commit.status-check.total-timeout-ms"
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT = 30 * 60 * 1000 # 30 minutes
class _StaticUpdate:
"""Wrapper for simple TableUpdates to make them retryable.
This class wraps TableUpdates that don't need regeneration on retry
(like SetPropertiesUpdate, RemovePropertiesUpdate, UpgradeFormatVersionUpdate).
"""
def __init__(self, updates: tuple[TableUpdate, ...], requirements: tuple[TableRequirement, ...] = ()):
self._updates = updates
self._requirements = requirements
def _reset_state(self) -> None:
"""No-op for static updates that don't cache metadata-derived state."""
def _commit(self) -> UpdatesAndRequirements:
"""Return the stored updates and requirements."""
return self._updates, self._requirements
class Transaction:
_table: Table
_autocommit: bool
_updates: tuple[TableUpdate, ...]
_requirements: tuple[TableRequirement, ...]
_pending_updates: list[_StaticUpdate | UpdateTableMetadata[Any]]
# NOTE: Whenever _updates is modified, _working_metadata must be updated via update_table_metadata()
_working_metadata: TableMetadata
def __init__(self, table: Table, autocommit: bool = False):
"""Open a transaction to stage and commit changes to a table.
Expand All
@@ -261,10 +310,13 @@ def __init__(self, table: Table, autocommit: bool = False):
self._autocommit = autocommit
self._updates = ()
self._requirements = ()
self._pending_updates = []
self._working_metadata = table.metadata
@property
def table_metadata(self) -> TableMetadata:
return update_table_metadata(self._table.metadata, self._updates)
"""Return the current working metadata with all updates applied."""
return self._working_metadata
def __enter__(self) -> Transaction:
"""Start a transaction to update the table."""
Expand All
@@ -275,19 +327,31 @@ def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException |
if exctype is None and excinst is None and exctb is None:
self.commit_transaction()
def _apply(self, updates: tuple[TableUpdate, ...], requirements: tuple[TableRequirement, ...] = ()) -> Transaction:
def _apply(
self,
updates: tuple[TableUpdate, ...],
requirements: tuple[TableRequirement, ...] = (),
pending_update: _StaticUpdate | UpdateTableMetadata[Any] | None = None,
) -> Transaction:
"""Check if the requirements are met, and applies the updates to the metadata."""
for requirement in requirements:
requirement.validate(self.table_metadata)
self._updates += updates
self._working_metadata = update_table_metadata(self._working_metadata, updates)
# For the requirements, it does not make sense to add a requirement more than once
# For example, you cannot assert that the current schema has two different IDs
existing_requirements = {type(requirement ) for requirement in self._requirements}
existing_requirement_keys = {req.key( ) for req in self._requirements}
for new_requirement in requirements:
if type(new_requirement) not in existing_requirements:
self._requirements = self._requirements + (new_requirement,)
key = new_requirement.key()
if key not in existing_requirement_keys:
self._requirements += (new_requirement,)
existing_requirement_keys.add(key)
if pending_update is not None:
self._pending_updates.append(pending_update)
if self._autocommit:
self.commit_transaction()
Expand Down
Expand Up
@@ -316,7 +380,8 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")
if format_version > self.table_metadata.format_version:
return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))
updates = (UpgradeFormatVersionUpdate(format_version=format_version),)
return self._apply(updates, pending_update=_StaticUpdate(updates))
return self
Expand All
@@ -334,8 +399,9 @@ def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) ->
"""
if properties and kwargs:
raise ValueError("Cannot pass both properties and kwargs")
updates = properties or kwargs
return self._apply((SetPropertiesUpdate(updates=updates),))
props = properties or kwargs
updates = (SetPropertiesUpdate(updates=props),)
return self._apply(updates, pending_update=_StaticUpdate(updates))
def _set_ref_snapshot(
self,
Expand Down
Expand Up
@@ -928,7 +994,8 @@ def remove_properties(self, *removals: str) -> Transaction:
Returns:
The alter table builder.
"""
return self._apply((RemovePropertiesUpdate(removals=removals),))
updates = (RemovePropertiesUpdate(removals=removals),)
return self._apply(updates, pending_update=_StaticUpdate(updates))
def update_location(self, location: str) -> Transaction:
"""Set the new table location.
Expand All
@@ -949,15 +1016,95 @@ def commit_transaction(self) -> Table:
"""
if len(self._updates) > 0:
self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
if self._pending_updates:
self._commit_with_retry()
else:
self._table._do_commit( # pylint: disable=W0212
updates=self._updates,
requirements=self._requirements,
)
self._updates = ()
self._requirements = ()
self._pending_updates = []
return self._table
def _commit_with_retry(self) -> None:
"""Commit transaction with retry logic.
On retry, refreshes table metadata and regenerates snapshots from
the pending snapshot producers.
"""
properties = self._table.metadata.properties
max_attempts = property_as_int(properties, TableProperties.COMMIT_NUM_RETRIES)
min_wait_ms = property_as_int(properties, TableProperties.COMMIT_MIN_RETRY_WAIT_MS)
max_wait_ms = property_as_int(properties, TableProperties.COMMIT_MAX_RETRY_WAIT_MS)
total_timeout_ms = property_as_int(properties, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS)
retry_config = RetryConfig(
max_attempts=max_attempts if max_attempts is not None else TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
min_wait_ms=min_wait_ms if min_wait_ms is not None else TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
max_wait_ms=max_wait_ms if max_wait_ms is not None else TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
total_timeout_ms=total_timeout_ms
if total_timeout_ms is not None
else TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
)
first_attempt = True
def do_commit() -> None:
nonlocal first_attempt
if first_attempt:
first_attempt = False
else:
self._reapply_updates()
self._table._do_commit( # pylint: disable=W0212
updates=self._updates,
requirements=self._requirements,
)
run_with_retry(
task=do_commit,
config=retry_config,
retry_on=(CommitFailedException,),
)
def _reapply_updates(self) -> None:
"""Reapply all updates after refreshing table metadata.
This is called on retry to regenerate all pending updates
based on the latest table metadata, similar to Java's BaseTransaction.applyUpdates().
NOTE: When adding new cached properties to UpdateTableMetadata subclasses,
ensure they are cleared in _reset_state() to avoid stale data on retry.
"""
self._table.refresh()
self._updates = ()
self._requirements = ()
self._working_metadata = self._table.metadata
return self._table
for pending_update in self._pending_updates:
pending_update._reset_state()
updates, requirements = pending_update._commit()
self._updates += updates
self._working_metadata = update_table_metadata(self._working_metadata, updates)
existing_requirement_keys: set[tuple[Any, ...]] = set()
existing_req: TableRequirement
for existing_req in self._requirements:
existing_requirement_keys.add(existing_req.key())
for req in requirements:
key = req.key()
if key not in existing_requirement_keys:
self._requirements += (req,)
existing_requirement_keys.add(key)
self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
class CreateTableTransaction(Transaction):
Expand Down
Expand Up
@@ -995,6 +1142,8 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:
SetPropertiesUpdate(updates=table_metadata.properties),
)
self._working_metadata = update_table_metadata(self._working_metadata, self._updates)
def __init__(self, table: StagedTable):
super().__init__(table, autocommit=False)
self._initial_changes(table.metadata)
Expand Down