Slings from a data source to a data target.
pip install sling or pip install sling[arrow] for streaming.
Then you should be able to run sling --help from command line.
| Variable | Description | Default |
|---|---|---|
SLING_PYTHON_USE_SHELL |
Set to true to run the sling binary with shell=True in subprocess calls. |
false |
sling run --src-conn MY_PG --src-stream myschema.mytable \ --tgt-conn YOUR_SNOWFLAKE --tgt-object yourschema.yourtable \ --mode full-refresh
Or passing a yaml/json string or file
cat ' source: MY_POSTGRES target: MY_SNOWFLAKE # default config options which apply to all streams defaults: mode: full-refresh object: new_schema.{stream_schema}_{stream_table} streams: my_schema.*: ' > /path/to/replication.yaml sling run -r /path/to/replication.yaml
Run a replication from file:
import yaml from sling import Replication # From a YAML file replication = Replication(file_path="path/to/replication.yaml") replication.run() # Or load into object with open('path/to/replication.yaml') as file: config = yaml.load(file, Loader=yaml.FullLoader) replication = Replication(**config) replication.run()
Build a replication dynamically:
from sling import Replication, ReplicationStream, Mode # build sling replication streams = {} for (folder, table_name) in list(folders): streams[folder] = ReplicationStream( mode=Mode.FULL_REFRESH, object=table_name, primary_key='_hash_id') replication = Replication( source='aws_s3', target='snowflake', streams=streams, env=dict(SLING_STREAM_URL_COLUMN='true', SLING_LOADED_AT_COLUMN='true'), debug=True, ) replication.run()
For more direct control and streaming capabilities, you can use the Sling class, which mirrors the CLI interface.
import os from sling import Sling, Mode # Set postgres & snowflake connection # see https://docs.slingdata.io/connections/database-connections os.environ["POSTGRES"] = 'postgres://...' os.environ["SNOWFLAKE"] = 'snowflake://...' # Database to database transfer Sling( src_conn="postgres", src_stream="public.users", tgt_conn="snowflake", tgt_object="public.users_copy", mode=Mode.FULL_REFRESH ).run() # Database to file Sling( src_conn="postgres", src_stream="select * from users where active = true", tgt_object="file:///tmp/active_users.csv" ).run() # File to database Sling( src_stream="file:///path/to/data.csv", tgt_conn="snowflake", tgt_object="public.imported_data" ).run()
π‘ Tip: Install
pip install sling[arrow]for better streaming performance and improved data type handling.
π DataFrame Support: The
inputparameter accepts lists of dictionaries, pandas DataFrames, or polars DataFrames. DataFrame support preserves data types when using Arrow format.
β οΈ Note: Be careful with large numbers ofSlinginvocations usinginputorstream()methods when working with external systems (databases, file systems). Each call re-opens the connection since it invokes the underlying sling binary. For better performance and connection reuse, consider using theReplicationclass instead, which maintains open connections across multiple operations.
import os from sling import Sling, Format # Set postgres connection # see https://docs.slingdata.io/connections/database-connections os.environ["POSTGRES"] = 'postgres://...' # Stream Python data to CSV file data = [ {"id": 1, "name": "John", "age": 30}, {"id": 2, "name": "Jane", "age": 25}, {"id": 3, "name": "Bob", "age": 35} ] Sling( input=data, tgt_object="file:///tmp/output.csv" ).run() # Stream Python data to database Sling( input=data, tgt_conn="postgres", tgt_object="public.users" ).run() # Stream Python data to JSON Lines file Sling( input=data, tgt_object="file:///tmp/output.jsonl", tgt_options={"format": Format.JSONLINES} ).run() # Stream from generator (memory efficient for large datasets) def data_generator(): for i in range(10000): yield {"id": i, "value": f"item_{i}", "timestamp": "2023-01-01"} Sling(input=data_generator(), tgt_object="file:///tmp/large_dataset.csv").run() # Stream pandas DataFrame to database import pandas as pd df = pd.DataFrame({ "id": [1, 2, 3, 4], "name": ["Alice", "Bob", "Charlie", "Diana"], "age": [25, 30, 35, 28], "salary": [50000, 60000, 70000, 55000] }) Sling( input=df, tgt_conn="postgres", tgt_object="public.employees" ).run() # Stream polars DataFrame to CSV file import polars as pl df = pl.DataFrame({ "product_id": [101, 102, 103], "product_name": ["Laptop", "Mouse", "Keyboard"], "price": [999.99, 25.50, 75.00], "in_stock": [True, False, True] }) Sling( input=df, tgt_object="file:///tmp/products.csv" ).run() # DataFrame with column selection Sling( input=df, select=["product_name", "price"], # Only export specific columns tgt_object="file:///tmp/product_prices.csv" ).run()
import os from sling import Sling # Set postgres connection # see https://docs.slingdata.io/connections/database-connections os.environ["POSTGRES"] = 'postgres://...' # Stream data from database sling = Sling( src_conn="postgres", src_stream="public.users", limit=1000 ) for record in sling.stream(): print(f"User: {record['name']}, Age: {record['age']}") # Stream data from file sling = Sling( src_stream="file:///path/to/data.csv" ) # Process records one by one (memory efficient) for record in sling.stream(): # Process each record processed_data = transform_record(record) # Could save to another system, send to API, etc. # Stream with parameters sling = Sling( src_conn="postgres", src_stream="public.orders", select=["order_id", "customer_name", "total"], where="total > 100", limit=500 ) records = list(sling.stream()) print(f"Found {len(records)} high-value orders")
π Performance: The
stream_arrow()method provides the highest performance streaming with full data type preservation by using Apache Arrow's columnar format. Requirespip install sling[arrow].
π Type Safety: Unlike
stream()which may convert data types during CSV serialization,stream_arrow()preserves exact data types including integers, floats, timestamps, and more.
import os from sling import Sling # Set postgres connection # see https://docs.slingdata.io/connections/database-connections os.environ["POSTGRES"] = 'postgres://...' # Basic Arrow streaming from database sling = Sling(src_conn="postgres", src_stream="public.users", limit=1000) # Get Arrow RecordBatchStreamReader for maximum performance reader = sling.stream_arrow() # Convert to Arrow Table for analysis table = reader.read_all() print(f"Received {table.num_rows} rows with {table.num_columns} columns") print(f"Column names: {table.column_names}") print(f"Schema: {table.schema}") # Convert to pandas DataFrame with preserved types if table.num_rows > 0: df = table.to_pandas() print(df.dtypes) # Shows preserved data types # Stream Arrow file with type preservation sling = Sling( src_stream="file:///path/to/data.arrow", src_options={"format": "arrow"} ) reader = sling.stream_arrow() table = reader.read_all() # Access columnar data directly (very efficient) for column_name in table.column_names: column = table.column(column_name) print(f"{column_name}: {column.type}") # Process Arrow batches for large datasets (memory efficient) sling = Sling( src_conn="postgres", src_stream="select * from large_table" ) reader = sling.stream_arrow() for batch in reader: # Process each batch separately to manage memory print(f"Processing batch with {batch.num_rows} rows") # Convert batch to pandas if needed batch_df = batch.to_pandas() # Process batch_df... # Round-trip with Arrow format preservation import pandas as pd # Write DataFrame to Arrow file with type preservation df = pd.DataFrame({ "id": [1, 2, 3], "amount": [100.50, 250.75, 75.25], "timestamp": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]), "active": [True, False, True] }) Sling( input=df, tgt_object="file:///tmp/data.arrow", tgt_options={"format": "arrow"} ).run() # Read back with full type preservation sling = Sling( src_stream="file:///tmp/data.arrow", src_options={"format": "arrow"} ) reader = sling.stream_arrow() restored_table = reader.read_all() restored_df = restored_table.to_pandas() # Types are exactly preserved (no string conversion) print(restored_df.dtypes) assert restored_df['active'].dtype == 'bool' assert 'datetime64' in str(restored_df['timestamp'].dtype)
Notes:
stream_arrow()requires PyArrow:pip install sling[arrow]- Cannot be used with a target object (use
run()instead) - Provides the best performance for large datasets
- Preserves exact data types including timestamps, decimals, and booleans
- Ideal for analytics workloads and data science applications
import os from sling import Sling # Set postgres connection # see https://docs.slingdata.io/connections/database-connections os.environ["POSTGRES"] = 'postgres://...' # Python β File β Python original_data = [ {"id": 1, "name": "Alice", "score": 95.5}, {"id": 2, "name": "Bob", "score": 87.2} ] # Step 1: Python data to file sling_write = Sling( input=original_data, tgt_object="file:///tmp/scores.csv" ) sling_write.run() # Step 2: File back to Python sling_read = Sling( src_stream="file:///tmp/scores.csv" ) loaded_data = list(sling_read.stream()) # Python β Database β Python (with transformations) sling_to_db = Sling( input=original_data, tgt_conn="postgres", tgt_object="public.temp_scores" ) sling_to_db.run() sling_from_db = Sling( src_conn="postgres", src_stream="select *, score * 1.1 as boosted_score from public.temp_scores", ) transformed_data = list(sling_from_db.stream()) # DataFrame β Database β DataFrame (with pandas/polars) import pandas as pd # Start with pandas DataFrame df = pd.DataFrame({ "user_id": [1, 2, 3], "purchase_amount": [100.50, 250.75, 75.25], "category": ["electronics", "clothing", "books"] }) # Write DataFrame to database Sling( input=df, tgt_conn="postgres", tgt_object="public.purchases" ).run() # Read back with SQL transformations as pandas DataFrame sling_query = Sling( src_conn="postgres", src_stream=""" SELECT category, COUNT(*) as purchase_count, AVG(purchase_amount) as avg_amount FROM public.purchases GROUP BY category """ ) summary_data = list(sling_query.stream()) summary_df = pd.DataFrame(summary_data) print(summary_df)
Run a Pipeline:
from sling import Pipeline from sling.hooks import StepLog, StepCopy, StepReplication, StepHTTP, StepCommand # From a YAML file pipeline = Pipeline(file_path="path/to/pipeline.yaml") pipeline.run() # Or using Hook objects for type safety pipeline = Pipeline( steps=[ StepLog(message="Hello world"), StepCopy(from_="sftp//path/to/file", to="aws_s3/path/to/file"), StepReplication(path="path/to/replication.yaml"), StepHTTP(url="https://trigger.webhook.com"), StepCommand(command=["ls", "-l"], print_output=True) ], env={"MY_VAR": "value"} ) pipeline.run() # Or programmatically using dictionaries pipeline = Pipeline( steps=[ {"type": "log", "message": "Hello world"}, {"type": "copy", "from": "sftp//path/to/file", "to": "aws_s3/path/to/file"}, {"type": "replication", "path": "path/to/replication.yaml"}, {"type": "http", "url": "https://trigger.webhook.com"}, {"type": "command", "command": ["ls", "-l"], "print": True} ], env={"MY_VAR": "value"} ) pipeline.run()
Build API Spec YAML files programmatically with type checking and validation. API specs define how Sling extracts data from REST APIs.
from sling.api_spec import ( ApiSpec, Endpoint, Request, Pagination, Response, Records, Processor, Rule, Iterate, Call, DynamicEndpoint, AuthType, HTTPMethod, RuleAction, AggregationType, BackoffType, ResponseFormat, ) spec = ApiSpec( name="My API", description="Extract data from My API", queues=["user_ids"], defaults=Endpoint( state={"base_url": "https://api.example.com/v1", "limit": 100}, request=Request( headers={ "Authorization": 'Bearer {require(secrets.api_key, "api_key required")}', "Accept": "application/json", }, rate=5, concurrency=3, ), response=Response( records=Records(jmespath="data[]", primary_key=["id"]), rules=[ Rule( action=RuleAction.RETRY, condition="response.status == 429", max_attempts=5, backoff=BackoffType.EXPONENTIAL, backoff_base=2, ), ], ), pagination=Pagination( next_state={"offset": "{state.offset + state.limit}"}, stop_condition="length(response.records) < state.limit", ), ), endpoints={ "users": Endpoint( description="List all users", state={"offset": 0}, request=Request( url="{state.base_url}/users", parameters={"limit": "{state.limit}", "offset": "{state.offset}"}, ), response=Response( processors=[ Processor(expression="record.id", output="queue.user_ids"), ], ), ), "user_orders": Endpoint( description="Get orders for each user", iterate=Iterate(over="queue.user_ids", into="state.user_id", concurrency=5), request=Request(url="{state.base_url}/users/{state.user_id}/orders"), response=Response( processors=[ Processor(expression="state.user_id", output="record.user_id"), ], ), ), "metrics": Endpoint( description="Daily metrics (incremental)", state={ "offset": 0, "since": '{coalesce(sync.last_date, date_format(date_add(now(), -30, "day"), "%Y-%m-%d"))}', }, sync=["last_date"], request=Request( url="{state.base_url}/metrics", parameters={"since": "{state.since}"}, ), response=Response( records=Records(primary_key=["id"], update_key="date"), processors=[ Processor( expression="record.date", output="state.last_date", aggregation=AggregationType.MAXIMUM, ), ], ), ), }, ) # Validate errors = spec.validate() assert errors == [], errors # Write to file spec.to_yaml_file("my_api.yaml") # Or get as string print(spec.to_yaml()) print(spec.to_json())
Parse an existing spec:
from sling.api_spec import ApiSpec, Endpoint, Request, Response, Records spec = ApiSpec.parse_file("path/to/spec.yaml") print(spec.name) print(list(spec.endpoints.keys())) # Modify and re-export spec.endpoints["new_endpoint"] = Endpoint( request=Request(url="{state.base_url}/new"), response=Response(records=Records(primary_key=["id"])), ) spec.to_yaml_file("updated_spec.yaml")
Use +rules/+processors modifiers to append to defaults without replacing them:
from sling.api_spec import Endpoint, Request, Response, Rule, RuleAction endpoint = Endpoint( request=Request(url="{state.base_url}/fragile"), response=Response( # append_rules serializes as "rules+" in YAML, keeping default rules intact append_rules=[Rule(action=RuleAction.SKIP, condition="response.status == 404")], ), )
The package lives in the inner sling/ directory and ships a pyproject.toml
with a test dependency group. The
recommended way to run the tests is with uv,
which builds the package and installs all test dependencies (pytest,
pytest-mock, pyarrow, pandas, polars) into an isolated .venv:
cd sling # Install the package + test dependencies into .venv uv sync --group test # Run the suite uv run python -m pytest tests/tests.py -v uv run python -m pytest tests/test_connection.py -v uv run python -m pytest tests/test_api_spec.py -v uv run python -m pytest tests/test_columns_type_casting.py -v # test_sling_class.py is run with Arrow on and off SLING_USE_ARROW=false uv run python -m pytest tests/test_sling_class.py -v SLING_USE_ARROW=true uv run python -m pytest tests/test_sling_class.py -v
To test against a specific sling binary instead of the auto-downloaded
release (e.g. a local build or a dev build), point SLING_BINARY at it:
SLING_BINARY=/path/to/sling uv run python -m pytest tests/test_sling_class.py -v
The CI workflow (
test-cliin theslingrepo) uses exactly this flow βuv sync --group test+uv run pytestβ against the latest dev build, across Linux, Mac and Windows.
If you prefer plain pip/pytest:
pip install -e sling pip install pytest pytest-mock pyarrow pandas polars pytest sling/tests/tests.py -v pytest sling/tests/test_sling_class.py -v
To Login:
mcp-publisher login dns --domain slingdata.io --private-key $(openssl pkey -in mcp-key.pem -noout -text | grep -A3 "priv:" | tail -n +2 | tr -d ' :\n')`
To Publish:
# to publish, adjust the version first in server.json mcp-publisher publish # check curl "https://registry.modelcontextprotocol.io/v0/servers?search=io.slingdata/sling-cli"
mcp-name: io.slingdata/sling-cli