Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: Add configurable commit retry logic for snapshot and transaction operations #2794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
KazuhitoT wants to merge 12 commits into apache:main
base: main
Choose a base branch
Loading
from KazuhitoT:commit-retry2
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 160 additions & 11 deletions pyiceberg/table/__init__.py
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from sortedcontainers import SortedList

import pyiceberg.expressions.parser as parser
from pyiceberg.exceptions import CommitFailedException
from pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
Expand Down Expand Up @@ -104,6 +105,7 @@
TableRequirement,
TableUpdate,
UpdatesAndRequirements,
UpdateTableMetadata,
UpgradeFormatVersionUpdate,
update_table_metadata,
)
Expand Down Expand Up @@ -132,7 +134,8 @@
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.properties import property_as_bool
from pyiceberg.utils.properties import property_as_bool, property_as_int
from pyiceberg.utils.retry import RetryConfig, run_with_retry

if TYPE_CHECKING:
import bodo.pandas as bd
Expand Down Expand Up @@ -243,12 +246,58 @@ class TableProperties:
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1

COMMIT_NUM_RETRIES = "commit.retry.num-retries"
COMMIT_NUM_RETRIES_DEFAULT = 4

COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100

COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute

COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 30 * 60 * 1000 # 30 minutes

COMMIT_NUM_STATUS_CHECKS = "commit.status-check.num-retries"
COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3

COMMIT_STATUS_CHECKS_MIN_WAIT_MS = "commit.status-check.min-wait-ms"
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT = 1000 # 1 second

COMMIT_STATUS_CHECKS_MAX_WAIT_MS = "commit.status-check.max-wait-ms"
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute

COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS = "commit.status-check.total-timeout-ms"
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT = 30 * 60 * 1000 # 30 minutes


class _StaticUpdate:
"""Wrapper for simple TableUpdates to make them retryable.

This class wraps TableUpdates that don't need regeneration on retry
(like SetPropertiesUpdate, RemovePropertiesUpdate, UpgradeFormatVersionUpdate).
"""

def __init__(self, updates: tuple[TableUpdate, ...], requirements: tuple[TableRequirement, ...] = ()):
self._updates = updates
self._requirements = requirements

def _reset_state(self) -> None:
"""No-op for static updates that don't cache metadata-derived state."""

def _commit(self) -> UpdatesAndRequirements:
"""Return the stored updates and requirements."""
return self._updates, self._requirements


class Transaction:
_table: Table
_autocommit: bool
_updates: tuple[TableUpdate, ...]
_requirements: tuple[TableRequirement, ...]
_pending_updates: list[_StaticUpdate | UpdateTableMetadata[Any]]
# NOTE: Whenever _updates is modified, _working_metadata must be updated via update_table_metadata()
_working_metadata: TableMetadata

def __init__(self, table: Table, autocommit: bool = False):
"""Open a transaction to stage and commit changes to a table.
Expand All @@ -261,10 +310,13 @@ def __init__(self, table: Table, autocommit: bool = False):
self._autocommit = autocommit
self._updates = ()
self._requirements = ()
self._pending_updates = []
self._working_metadata = table.metadata

@property
def table_metadata(self) -> TableMetadata:
return update_table_metadata(self._table.metadata, self._updates)
"""Return the current working metadata with all updates applied."""
return self._working_metadata

def __enter__(self) -> Transaction:
"""Start a transaction to update the table."""
Expand All @@ -275,19 +327,31 @@ def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException |
if exctype is None and excinst is None and exctb is None:
self.commit_transaction()

def _apply(self, updates: tuple[TableUpdate, ...], requirements: tuple[TableRequirement, ...] = ()) -> Transaction:
def _apply(
self,
updates: tuple[TableUpdate, ...],
requirements: tuple[TableRequirement, ...] = (),
pending_update: _StaticUpdate | UpdateTableMetadata[Any] | None = None,
) -> Transaction:
"""Check if the requirements are met, and applies the updates to the metadata."""
for requirement in requirements:
requirement.validate(self.table_metadata)

self._updates += updates

self._working_metadata = update_table_metadata(self._working_metadata, updates)

# For the requirements, it does not make sense to add a requirement more than once
# For example, you cannot assert that the current schema has two different IDs
existing_requirements = {type(requirement) for requirement in self._requirements}
existing_requirement_keys = {req.key() for req in self._requirements}
for new_requirement in requirements:
if type(new_requirement) not in existing_requirements:
self._requirements = self._requirements + (new_requirement,)
key = new_requirement.key()
if key not in existing_requirement_keys:
self._requirements += (new_requirement,)
existing_requirement_keys.add(key)

if pending_update is not None:
self._pending_updates.append(pending_update)

if self._autocommit:
self.commit_transaction()
Expand Down Expand Up @@ -316,7 +380,8 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

if format_version > self.table_metadata.format_version:
return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))
updates = (UpgradeFormatVersionUpdate(format_version=format_version),)
return self._apply(updates, pending_update=_StaticUpdate(updates))

return self

Expand All @@ -334,8 +399,9 @@ def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) ->
"""
if properties and kwargs:
raise ValueError("Cannot pass both properties and kwargs")
updates = properties or kwargs
return self._apply((SetPropertiesUpdate(updates=updates),))
props = properties or kwargs
updates = (SetPropertiesUpdate(updates=props),)
return self._apply(updates, pending_update=_StaticUpdate(updates))

def _set_ref_snapshot(
self,
Expand Down Expand Up @@ -928,7 +994,8 @@ def remove_properties(self, *removals: str) -> Transaction:
Returns:
The alter table builder.
"""
return self._apply((RemovePropertiesUpdate(removals=removals),))
updates = (RemovePropertiesUpdate(removals=removals),)
return self._apply(updates, pending_update=_StaticUpdate(updates))

def update_location(self, location: str) -> Transaction:
"""Set the new table location.
Expand All @@ -949,15 +1016,95 @@ def commit_transaction(self) -> Table:
"""
if len(self._updates) > 0:
self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)

if self._pending_updates:
self._commit_with_retry()
else:
self._table._do_commit( # pylint: disable=W0212
updates=self._updates,
requirements=self._requirements,
)

self._updates = ()
self._requirements = ()
self._pending_updates = []

return self._table

def _commit_with_retry(self) -> None:
"""Commit transaction with retry logic.

On retry, refreshes table metadata and regenerates snapshots from
the pending snapshot producers.
"""
properties = self._table.metadata.properties

max_attempts = property_as_int(properties, TableProperties.COMMIT_NUM_RETRIES)
min_wait_ms = property_as_int(properties, TableProperties.COMMIT_MIN_RETRY_WAIT_MS)
max_wait_ms = property_as_int(properties, TableProperties.COMMIT_MAX_RETRY_WAIT_MS)
total_timeout_ms = property_as_int(properties, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS)

retry_config = RetryConfig(
max_attempts=max_attempts if max_attempts is not None else TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
min_wait_ms=min_wait_ms if min_wait_ms is not None else TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
max_wait_ms=max_wait_ms if max_wait_ms is not None else TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
total_timeout_ms=total_timeout_ms
if total_timeout_ms is not None
else TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
)

first_attempt = True

def do_commit() -> None:
nonlocal first_attempt
if first_attempt:
first_attempt = False
else:
self._reapply_updates()

self._table._do_commit( # pylint: disable=W0212
updates=self._updates,
requirements=self._requirements,
)

run_with_retry(
task=do_commit,
config=retry_config,
retry_on=(CommitFailedException,),
)

def _reapply_updates(self) -> None:
"""Reapply all updates after refreshing table metadata.

This is called on retry to regenerate all pending updates
based on the latest table metadata, similar to Java's BaseTransaction.applyUpdates().

NOTE: When adding new cached properties to UpdateTableMetadata subclasses,
ensure they are cleared in _reset_state() to avoid stale data on retry.
"""
self._table.refresh()

self._updates = ()
self._requirements = ()
self._working_metadata = self._table.metadata

return self._table
for pending_update in self._pending_updates:
pending_update._reset_state()
updates, requirements = pending_update._commit()
self._updates += updates
self._working_metadata = update_table_metadata(self._working_metadata, updates)

existing_requirement_keys: set[tuple[Any, ...]] = set()
existing_req: TableRequirement
for existing_req in self._requirements:
existing_requirement_keys.add(existing_req.key())
for req in requirements:
key = req.key()
if key not in existing_requirement_keys:
self._requirements += (req,)
existing_requirement_keys.add(key)

self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)


class CreateTableTransaction(Transaction):
Expand Down Expand Up @@ -995,6 +1142,8 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:
SetPropertiesUpdate(updates=table_metadata.properties),
)

self._working_metadata = update_table_metadata(self._working_metadata, self._updates)

def __init__(self, table: StagedTable):
super().__init__(table, autocommit=False)
self._initial_changes(table.metadata)
Expand Down
24 changes: 22 additions & 2 deletions pyiceberg/table/update/__init__.py
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,23 @@ def __init__(self, transaction: Transaction) -> None:
self._transaction = transaction

@abstractmethod
def _commit(self) -> UpdatesAndRequirements: ...
def _commit(self) -> UpdatesAndRequirements:
"""Generate the table updates and requirements for this operation."""
...

@abstractmethod
def _reset_state(self) -> None:
"""Reset internal state for retry after table metadata refresh.

This is called by Transaction._reapply_updates() when retrying after a
CommitFailedException. Implementations should rebuild any cached state
from self._transaction.table_metadata.
"""
...

def commit(self) -> None:
self._transaction._apply(*self._commit())
updates, requirements = self._commit()
self._transaction._apply(updates, requirements, pending_update=self)

def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Close and commit the change."""
Expand Down Expand Up @@ -753,6 +766,10 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
"""
...

def key(self) -> tuple[Any, ...]:
"""Return a deduplication key for this requirement."""
return (type(self),)


class AssertCreate(ValidatableTableRequirement):
"""The table must not already exist; used for create transactions."""
Expand Down Expand Up @@ -811,6 +828,9 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
elif self.snapshot_id is not None:
raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")

def key(self) -> tuple[Any, ...]:
return (type(self), self.ref)


class AssertLastAssignedFieldId(ValidatableTableRequirement):
"""The table's last assigned column id must match the requirement's `last-assigned-field-id`."""
Expand Down
Loading

AltStyle によって変換されたページ (->オリジナル) /