A comprehensive Python client library for Unstract APIHUB services that provides clean, Pythonic interfaces for multiple document processing APIs including table extraction, document splitting, and generic document processing with dynamic endpoints.
Python Version PyPI Version License Build Status PyPI Downloads uv Ruff
- Multi-Client Architecture: Three specialized clients for different use cases
ApiHubClient: Table extraction and discovery APIsDocSplitterClient: Document splitting and chunking servicesGenericUnstractClient: Dynamic endpoint processing (invoice, contract, receipt, etc.)
- File Processing: Support for document processing with file uploads across all clients
- Status Monitoring: Track processing status with polling capabilities
- Error Handling: Comprehensive exception handling with meaningful messages
- Flexible Parameters: Support for custom parameters and configurations
- Automatic Polling: Optional wait-for-completion functionality
- Type Safety: Full type hints for better development experience
- Batch Processing: Built-in support for processing multiple documents
- Integration Ready: Easy integration between different client services
pip install apihub-python-client
Or install from source:
git clone https://github.com/Zipstack/apihub-python-client.git cd apihub-python-client pip install -e .
from apihub_client import ApiHubClient # Initialize the client client = ApiHubClient( api_key="your-api-key-here", base_url="https://api-hub.us-central.unstract.com/api/v1" ) # Process a document with automatic completion waiting result = client.extract( endpoint="bank_statement", vertical="table", sub_vertical="bank_statement", file_path="statement.pdf", wait_for_completion=True, polling_interval=3 # Check status every 3 seconds ) print("Processing completed!") print(result)
Split documents into smaller parts using the doc-splitter service:
from apihub_client import DocSplitterClient # Initialize the doc-splitter client doc_client = DocSplitterClient( api_key="your-api-key-here", base_url="http://localhost:8005" ) # Simple upload and wait for completion result = doc_client.upload( file_path="large_document.pdf", wait_for_completion=True, polling_interval=5 # Check status every 5 seconds ) # Download the split result output_file = doc_client.download_result( job_id=result["job_id"], output_path="split_result.zip" ) print(f"Downloaded result to: {output_file}")
# Step 1: Upload document upload_result = doc_client.upload(file_path="document.pdf") job_id = upload_result["job_id"] print(f"Upload completed. Job ID: {job_id}") # Step 2: Monitor status manually status = doc_client.get_job_status(job_id) print(f"Current status: {status['status']}") # Step 3: Wait for completion (with custom timeout) final_result = doc_client.wait_for_completion( job_id=job_id, timeout=600, # Wait up to 10 minutes polling_interval=3 # Check every 3 seconds ) # Step 4: Download the processed result downloaded_file = doc_client.download_result( job_id=job_id, output_path="processed_document.zip" ) print(f"Processing complete! Downloaded: {downloaded_file}")
import os from pathlib import Path def process_documents_batch(file_paths): """Process multiple documents with doc-splitter.""" results = [] for file_path in file_paths: try: print(f"Processing {file_path}...") # Upload and wait for completion result = doc_client.upload( file_path=file_path, wait_for_completion=True, polling_interval=5 ) # Generate output filename input_name = Path(file_path).stem output_path = f"{input_name}_split.zip" # Download result downloaded_file = doc_client.download_result( job_id=result["job_id"], output_path=output_path ) results.append({ "input": file_path, "output": downloaded_file, "job_id": result["job_id"], "success": True }) except Exception as e: print(f"Failed to process {file_path}: {e}") results.append({ "input": file_path, "error": str(e), "success": False }) return results # Process multiple files files = ["document1.pdf", "document2.pdf", "document3.pdf"] results = process_documents_batch(files) # Summary successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Process documents using dynamic endpoints like invoice, contract, receipt, etc.:
from apihub_client import GenericUnstractClient # Initialize the generic client client = GenericUnstractClient( api_key="your-api-key-here", base_url="http://localhost:8005" ) # Simple processing with automatic completion waiting result = client.process( endpoint="invoice", file_path="invoice.pdf", wait_for_completion=True, polling_interval=5 # Check status every 5 seconds ) print("Invoice processing completed:", result)
# Step 1: Start processing process_result = client.process( endpoint="contract", file_path="contract.pdf" ) execution_id = process_result["execution_id"] print(f"Processing started. Execution ID: {execution_id}") # Step 2: Check status manually status = client.check_status("contract", execution_id) print(f"Current status: {status}") # Step 3: Wait for completion (with custom timeout) final_result = client.wait_for_completion( endpoint="contract", execution_id=execution_id, timeout=600, # Wait up to 10 minutes polling_interval=3 # Check every 3 seconds ) # Step 4: Get result later (if needed) result = client.get_result("contract", execution_id) print("Processing complete:", result)
def process_documents_batch(endpoint, file_paths): """Process multiple documents with the same endpoint.""" results = [] for file_path in file_paths: try: print(f"Processing {file_path} with {endpoint} endpoint...") # Process and wait for completion result = client.process( endpoint=endpoint, file_path=file_path, wait_for_completion=True, polling_interval=5 ) results.append({ "input": file_path, "execution_id": result["execution_id"], "result": result, "success": True }) except Exception as e: print(f"Failed to process {file_path}: {e}") results.append({ "input": file_path, "error": str(e), "success": False }) return results # Process multiple invoices invoice_files = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"] results = process_documents_batch("invoice", invoice_files) # Process multiple contracts contract_files = ["contract1.pdf", "contract2.pdf"] contract_results = process_documents_batch("contract", contract_files) # Summary successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Combine doc-splitter with extraction APIs for complete document processing:
from apihub_client import ApiHubClient, DocSplitterClient # Initialize both clients api_client = ApiHubClient( api_key="your-api-key", base_url="https://api-hub.us-central.unstract.com/api/v1" ) doc_splitter = DocSplitterClient( api_key="your-api-key", base_url="http://localhost:8005" ) # Step 1: Split the large document split_result = doc_splitter.upload( file_path="large_contract.pdf", wait_for_completion=True ) # Step 2: Download split result doc_splitter.download_result( job_id=split_result["job_id"], output_path="split_documents.zip" ) # Step 3: Process individual documents (example with one document) # (assuming you extract individual PDFs from the zip) table_result = api_client.extract( endpoint="bank_statement", vertical="table", sub_vertical="bank_statement", file_path="individual_page.pdf", wait_for_completion=True ) print("Extracted data:", table_result)
from apihub_client import ApiHubClient, DocSplitterClient, GenericUnstractClient # Initialize all clients api_client = ApiHubClient( api_key="your-api-key", base_url="https://api-hub.us-central.unstract.com/api/v1" ) doc_splitter = DocSplitterClient( api_key="your-api-key", base_url="http://localhost:8005" ) generic_client = GenericUnstractClient( api_key="your-api-key", base_url="http://localhost:8005" ) # Workflow: Split → Extract → Process with Generic API # Step 1: Split large document split_result = doc_splitter.upload( file_path="large_document.pdf", wait_for_completion=True ) # Step 2: Extract tables from split documents # (after extracting individual files from the zip) table_result = api_client.extract( endpoint="discover_tables", vertical="table", sub_vertical="discover_tables", file_path="split_page_1.pdf", wait_for_completion=True ) # Step 3: Process with generic invoice API invoice_result = generic_client.process( endpoint="invoice", file_path="split_page_2.pdf", wait_for_completion=True ) print("Complete workflow finished!") print("Tables extracted:", len(table_result.get('data', []))) print("Invoice processed:", invoice_result.get('execution_id'))
# Step 1: Discover tables from the uploaded PDF initial_result = client.extract( endpoint="discover_tables", vertical="table", sub_vertical="discover_tables", ext_cache_result="true", ext_cache_text="true", file_path="statement.pdf" ) file_hash = initial_result.get("file_hash") print("File hash", file_hash) discover_tables_result = client.wait_for_complete(file_hash, timeout=600, # max wait for 10 mins polling_interval=3 # polling every 3s ) tables = json.loads(discover_tables_result['data']) print(f"Total tables in this document: {len(tables)}") all_table_result = [] # Step 2: Extract specific table for i, table in enumerate(tables): table_result = client.extract( endpoint="extract_table", vertical="table", sub_vertical="extract_table", file_hash=file_hash, ext_table_no=i, # extracting nth table wait_for_completion=True ) print(f"Extracted table : {table['table_name']}") all_table_result.append({table["table_name"]: table_result}) print("All table result") print(all_table_result)
# Process bank statement result = client.extract( endpoint="bank_statement", vertical="table", sub_vertical="bank_statement", file_path="bank_statement.pdf", wait_for_completion=True, polling_interval=3 ) print("Bank statement processed:", result)
# Step 1: Start processing initial_result = client.extract( endpoint="discover_tables", vertical="table", sub_vertical="discover_tables", file_path="document.pdf" ) file_hash = initial_result["file_hash"] print(f"Processing started with hash: {file_hash}") # Step 2: Monitor status status = client.get_status(file_hash) print(f"Current status: {status['status']}") # Step 3: Wait for completion (using wait_for_complete method) final_result = client.wait_for_complete( file_hash=file_hash, timeout=600, # Wait up to 10 minutes polling_interval=3 # Check every 3 seconds ) print("Final result:", final_result)
Once a file has been processed, you can reuse it by file hash:
# Process a different operation on the same file table_result = client.extract( endpoint="extract_table", vertical="table", sub_vertical="extract_table", file_hash="previously-obtained-hash", ext_table_no=1, # Extract second table. Indexing starts at 0 wait_for_completion=True )
Create a .env file:
API_KEY=your_api_key_here BASE_URL=https://api.example.com LOG_LEVEL=INFO
Then load in your code:
import os from dotenv import load_dotenv from apihub_client import ApiHubClient load_dotenv() client = ApiHubClient( api_key=os.getenv("API_KEY"), base_url=os.getenv("BASE_URL") )
The main client class for interacting with the ApiHub service.
client = ApiHubClient(api_key: str, base_url: str)
Parameters:
api_key(str): Your API key for authenticationbase_url(str): The base URL of the ApiHub service
Client for interacting with doc-splitter APIs for document splitting operations.
doc_client = DocSplitterClient(api_key: str, base_url: str)
Parameters:
api_key(str): Your API key for authenticationbase_url(str): The base URL of the doc-splitter service
Client for interacting with generic Unstract APIs using dynamic endpoints.
generic_client = GenericUnstractClient(api_key: str, base_url: str)
Parameters:
api_key(str): Your API key for authenticationbase_url(str): The base URL of the Unstract service
Start a document processing operation.
extract( endpoint: str, vertical: str, sub_vertical: str, file_path: str | None = None, file_hash: str | None = None, wait_for_completion: bool = False, polling_interval: int = 5, **kwargs ) -> dict
Parameters:
endpoint(str): The API endpoint to call (e.g., "discover_tables", "extract_table")vertical(str): The processing verticalsub_vertical(str): The processing sub-verticalfile_path(str, optional): Path to file for upload (for new files)file_hash(str, optional): Hash of previously uploaded file (for cached operations)wait_for_completion(bool): If True, polls until completion and returns final resultpolling_interval(int): Seconds between status checks when waiting (default: 5)**kwargs: Additional parameters specific to the endpoint
Returns:
dict: API response containing processing results or file hash for tracking
Check the status of a processing job.
get_status(file_hash: str) -> dict
Parameters:
file_hash(str): The file hash returned from extract()
Returns:
dict: Status information including current processing state
Get the final results of a completed processing job.
retrieve(file_hash: str) -> dict
Parameters:
file_hash(str): The file hash of the completed job
Returns:
dict: Final processing results
Wait for a processing job to complete by polling its status.
wait_for_complete( file_hash: str, timeout: int = 600, polling_interval: int = 3 ) -> dict
Parameters:
file_hash(str): The file hash of the job to wait fortimeout(int): Maximum time to wait in seconds (default: 600)polling_interval(int): Seconds between status checks (default: 3)
Returns:
dict: Final processing results when completed
Raises:
ApiHubClientException: If processing fails or times out
Upload a document for splitting.
upload( file_path: str, wait_for_completion: bool = False, polling_interval: int = 5, ) -> dict
Parameters:
file_path(str): Path to the file to uploadwait_for_completion(bool): If True, polls until completion and returns final resultpolling_interval(int): Seconds between status checks when waiting (default: 5)
Returns:
dict: Response containing job_id and status information
Check the status of a splitting job.
get_job_status(job_id: str) -> dict
Parameters:
job_id(str): The job ID to check status for
Returns:
dict: Status information including current processing state
Download the result of a completed splitting job.
download_result( job_id: str, output_path: str | None = None ) -> str
Parameters:
job_id(str): The job ID to download results foroutput_path(str, optional): Path where to save the downloaded file. If None, uses 'result_{job_id}.zip'
Returns:
str: Path to the downloaded file
Wait for a splitting job to complete by polling its status.
wait_for_completion( job_id: str, timeout: int = 600, polling_interval: int = 3 ) -> dict
Parameters:
job_id(str): The job ID to wait fortimeout(int): Maximum time to wait in seconds (default: 600)polling_interval(int): Seconds between status checks (default: 3)
Returns:
dict: Final job status information when completed
Raises:
ApiHubClientException: If processing fails or times out
Process a document using the specified endpoint.
process( endpoint: str, file_path: str, wait_for_completion: bool = False, polling_interval: int = 5, timeout: int = 600, ) -> dict
Parameters:
endpoint(str): The endpoint name (e.g., 'invoice', 'contract', 'receipt')file_path(str): Path to the file to uploadwait_for_completion(bool): If True, polls until completion and returns final resultpolling_interval(int): Seconds between status checks when waiting (default: 5)timeout(int): Maximum time to wait for completion in seconds (default: 600)
Returns:
dict: Response containing execution_id and processing information
Get the result of a processing operation.
get_result(endpoint: str, execution_id: str) -> dict
Parameters:
endpoint(str): The endpoint name used for processingexecution_id(str): The execution ID to get results for
Returns:
dict: Processing result or status information
Wait for a processing operation to complete by polling its status.
wait_for_completion( endpoint: str, execution_id: str, timeout: int = 600, polling_interval: int = 3, ) -> dict
Parameters:
endpoint(str): The endpoint name used for processingexecution_id(str): The execution ID to wait fortimeout(int): Maximum time to wait in seconds (default: 600)polling_interval(int): Seconds between status checks (default: 3)
Returns:
dict: Final processing result when completed
Check the current status of a processing operation.
check_status(endpoint: str, execution_id: str) -> str | None
Parameters:
endpoint(str): The endpoint name used for processingexecution_id(str): The execution ID to check status for
Returns:
str | None: Current status string, or None if not available
Raises:
ApiHubClientException: If processing fails or times out
All clients (ApiHubClient, DocSplitterClient, and GenericUnstractClient) use the same exception handling:
from apihub_client import ApiHubClientException, GenericUnstractClient generic_client = GenericUnstractClient(api_key="key", base_url="http://localhost:8005") try: result = generic_client.process( endpoint="invoice", file_path="invoice.pdf", wait_for_completion=True ) print("Processing completed:", result["execution_id"]) except ApiHubClientException as e: print(f"Error: {e.message}") print(f"Status Code: {e.status_code}")
import time from pathlib import Path def process_documents(file_paths, endpoint): results = [] for file_path in file_paths: try: print(f"Processing {file_path}...") # Start processing initial_result = client.extract( endpoint=endpoint, vertical="table", sub_vertical=endpoint, file_path=file_path ) # Wait for completion with custom settings result = client.wait_for_complete( file_hash=initial_result["file_hash"], timeout=900, # 15 minutes for batch processing polling_interval=5 # Less frequent polling for batch ) results.append({"file": file_path, "result": result, "success": True}) except ApiHubClientException as e: print(f"Failed to process {file_path}: {e.message}") results.append({"file": file_path, "error": str(e), "success": False}) return results # Process multiple files file_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"] results = process_documents(file_paths, "bank_statement") # Summary successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Run the test suite:
# Install development dependencies pip install -e ".[dev]" # Run all tests pytest # Run tests with coverage pytest --cov=apihub_client --cov-report=html # Run specific test files pytest test/test_client.py -v pytest test/test_integration.py -v
For integration tests with a real API:
# Create .env file with real credentials cp .env.example .env # Edit .env with your API credentials # Run integration tests pytest test/test_integration.py -v
Enable debug logging to see detailed request/response information:
import logging # Enable debug logging logging.basicConfig(level=logging.DEBUG) client = ApiHubClient(api_key="your-key", base_url="https://api.example.com") # Now all API calls will show detailed logs result = client.extract(...)
This project uses automated releases through GitHub Actions with PyPI Trusted Publishers for secure publishing.
- Go to GitHub Actions → "Release Tag and Publish Package"
- Click "Run workflow" and configure:
- Version bump:
patch(bug fixes),minor(new features), ormajor(breaking changes) - Pre-release: Check for beta/alpha versions
- Release notes: Optional custom notes
- Version bump:
- Click "Run workflow" - the automation handles the rest!
The workflow will automatically:
- Update version in the code
- Create Git tags and GitHub releases
- Run all tests and quality checks
- Publish to PyPI using
uv publishwith Trusted Publishers
For more details, see Release Documentation.
We welcome contributions! Please see our Contributing Guide for details.
# Clone the repository git clone https://github.com/Zipstack/apihub-python-client.git cd apihub-python-client # Install dependencies using uv (required - do not use pip) uv sync # Install pre-commit hooks uv run --frozen pre-commit install # Run tests uv run --frozen pytest # Run linting and formatting uv run --frozen ruff check . uv run --frozen ruff format . # Run type checking uv run --frozen mypy src/ # Run all pre-commit hooks manually uv run --frozen pre-commit run --all-files
This project is licensed under the MIT License - see the LICENSE file for details.
- Issues: GitHub Issues
- Documentation: Check this README and inline code documentation
- Examples: See the
examples/directory for more usage patterns
- Initial release
- Basic client functionality with extract, status, and retrieve operations
- File upload support
- Automatic polling with wait_for_completion
- Comprehensive test suite
Made with ❤️ by the Unstract team