CS3Client is a Python client for interacting with the CS3 (Cloud Sync&Share Storage) APIs. It allows users to seamlessly communicate with cloud storage services that support CS3 protocols, enabling file management, data transfer, and other cloud-based operations.
- Simple and easy-to-use API client for CS3 services.
- Support for common file operations (read, write, delete, rename, ...).
- Support for common lock operations (set lock, get lock, unlock, ...).
- Support for common share operations (create share, update share, delete share, ...).
- Support for common user operations (get user, find users, get user groups, ...).
- support for common group operations (get group, find group, has member, ...).
- Support for restoring files through checkpoints (restore file version, list checkpoints).
- Support for applications (open in app, list app providers).
- Authentication and authorization handling.
- Cross-platform compatibility.
- Detailed error handling and logging.
To install cs3client, you need to have Python 3.7+ installed. You can install the package via pip:
pip install cs3client
Alternatively, you can clone this repository and install manually:
git clone git@github.com:cs3org/cs3-python-client.git cd cs3-python-client pip install .
CS3Client can be configured by passing specific parameters when initializing the client through a ConfigParser instance.
host
chunk_size- 4194384grpc_timeout- 10http_timeout- 10tus_enabled- Falsessl_enabled- Falsessl_client_cert- Nonessl_client_key- Nonessl_ca_cert- Noneauth_client_id- Noneauth_login_type- "basic"lock_by_setting_attr- Falselock_not_impl- Falselock_expiration- 1800
[cs3client] # Required host = localhost:19000 # Optional, defaults to 4194304 chunk_size = 4194304 # Optional, defaults to 10 grpc_timeout = 10 # Optional, defaults to 10 http_timeout = 10 # Optional, defaults to True tus_enabled = False # Optional, defaults to True ssl_enabled = False # Optional, defaults to True ssl_verify = False # Optional, defaults to an empty string ssl_client_cert = test_client_cert # Optional, defaults to an empty string ssl_client_key = test_client_key # Optional, defaults to an empty string ssl_ca_cert = test_ca_cert # Optinal, defaults to an empty string auth_client_id = einstein # Optional (can also be set when instansiating the class) auth_client_secret = relativity # Optional, defaults to basic auth_login_type = basic # Optional, defaults to False lock_by_setting_attr = False # Optional, defaults to False lock_not_impl = False # Optional, defaults to 1800 lock_expiration = 1800
To use cs3client, you first need to import and configure it. Here's a simple example of how to set up and start using the client. For configuration see Configuration. For more in depth examples see cs3-python-client/examples/.
import logging import configparser from cs3client.cs3client import CS3Client from cs3client.auth import Auth config = configparser.ConfigParser() with open("default.conf") as fdef: config.read_file(fdef) log = logging.getLogger(__name__) client = CS3Client(config, "cs3client", log) auth = Auth(client) # Set the client id (can also be set in the config) auth.set_client_id("<your_client_id_here>") # Set client secret (can also be set in config) auth.set_client_secret("<your_client_secret_here>") # Checks if token is expired if not return ('x-access-token', <token>) # if expired, request a new token from reva auth_token = auth.get_token() # OR if you already have a reva token # Checks if token is expired if not return (x-access-token', <token>) # if expired, throws an AuthenticationException (so you can refresh your reva token) token = "<your_reva_token>" auth_token = Auth.check_token(token)
# mkdir directory_resource = Resource(abs_path=f"/eos/user/r/rwelande/test_directory") res = client.file.make_dir(auth.get_token(), directory_resource) # touchfile touch_resource = Resource(abs_path="/eos/user/r/rwelande/touch_file.txt") res = client.file.touch_file(auth.get_token(), touch_resource) # setxattr resource = Resource(abs_path="/eos/user/r/rwelande/text_file.txt") res = client.file.set_xattr(auth.get_token(), resource, "iop.wopi.lastwritetime", str(1720696124)) # rmxattr res = client.file.remove_xattr(auth.get_token(), resource, "iop.wopi.lastwritetime") # stat res = client.file.stat(auth.get_token(), resource) # removefile res = client.file.remove_file(auth.get_token(), touch_resource) # rename rename_resource = Resource(abs_path="/eos/user/r/rwelande/rename_file.txt") res = client.file.rename_file(auth.get_token(), resource, rename_resource) # writefile content = b"Hello World" size = len(content) res = client.file.write_file(auth.get_token(), rename_resource, content, size) # listdir list_directory_resource = Resource(abs_path="/eos/user/r/rwelande") res = client.file.list_dir(auth.get_token(), list_directory_resource) # readfile file_res = client.file.read_file(auth.get_token(), rename_resource)
WEBDAV_LOCK_PREFIX = 'opaquelocktoken:797356a8-0500-4ceb-a8a0-c94c8cde7eba' def encode_lock(lock): '''Generates the lock payload for the storage given the raw metadata''' if lock: return WEBDAV_LOCK_PREFIX + ' ' + b64encode(lock.encode()).decode() return None resource = Resource(abs_path="/eos/user/r/rwelande/lock_test.txt") # Set lock client.file.set_lock(auth_token, resource, app_name="a", lock_id=encode_lock("some_lock")) # Get lock res = client.file.get_lock(auth_token, resource) if res is not None: lock_id = res["lock_id"] print(res) # Unlock res = client.file.unlock(auth_token, resource, app_name="a", lock_id=lock_id) # Refresh lock client.file.set_lock(auth_token, resource, app_name="a", lock_id=encode_lock("some_lock")) res = client.file.refresh_lock( auth_token, resource, app_name="a", lock_id=encode_lock("new_lock"), existing_lock_id=lock_id ) if res is not None: print(res) res = client.file.get_lock(auth_token, resource) if res is not None: print(res)
# Create share # resource = Resource(abs_path="/eos/user/r/<some_username>/text.txt") resource_info = client.file.stat(auth.get_token(), resource) user = client.user.get_user_by_claim("username", "<some_username>") res = client.share.create_share(auth.get_token(), resource_info, user.id.opaque_id, user.id.idp, "EDITOR", "USER") # List existing shares # filter_list = [] filter = client.share.create_share_filter(resource_id=resource_info.id, filter_type="TYPE_RESOURCE_ID") filter_list.append(filter) filter = client.share.create_share_filter(share_state="SHARE_STATE_PENDING", filter_type="TYPE_STATE") filter_list.append(filter) res, _ = client.share.list_existing_shares(auth.get_token(), ) # Get share # share_id = "58" res = client.share.get_share(auth.get_token(), opaque_id=share_id) # update share # res = client.share.update_share(auth.get_token(), opaque_id=share_id, role="VIEWER") # remove share # res = client.share.remove_share(auth.get_token(), opaque_id=share_id) # List existing received shares # filter_list = [] filter = client.share.create_share_filter(share_state="SHARE_STATE_ACCEPTED", filter_type="TYPE_STATE") filter_list.append(filter) res, _ = client.share.list_received_existing_shares(auth.get_token()) # get received share # received_share = client.share.get_received_share(auth.get_token(), opaque_id=share_id) # update recieved share # res = client.share.update_received_share(auth.get_token(), received_share=received_share, state="SHARE_STATE_ACCEPTED") # create public share # res = client.share.create_public_share(auth.get_token(), resource_info, role="VIEWER") # list existing public shares # filter_list = [] filter = client.share.create_public_share_filter(resource_id=resource_info.id, filter_type="TYPE_RESOURCE_ID") filter_list.append(filter) res, _ = client.share.list_existing_public_shares(filter_list=filter_list) res = client.share.get_public_share(auth.get_token(), opaque_id=share_id, sign=True) # OR token = "<token>" # res = client.share.get_public_share(token=token, sign=True) # update public share # res = client.share.update_public_share(auth.get_token(), type="TYPE_PASSWORD", token=token, role="VIEWER", password="hello") # remove public share # res = client.share.remove_public_share(auth.get_token(), token=token)
# find_user res = client.user.find_users(auth.get_token(), "rwel") # get_user res = client.user.get_user("https://auth.cern.ch/auth/realms/cern", "asdoiqwe") # get_user_groups res = client.user.get_user_groups("https://auth.cern.ch/auth/realms/cern", "rwelande") # get_user_by_claim (mail) res = client.user.get_user_by_claim("mail", "rasmus.oscar.welander@cern.ch") # get_user_by_claim (username) res = client.user.get_user_by_claim("username", "rwelande")
# get_group_by_claim (username) res = client.group.get_group_by_claim(client.auth.get_token(), "username", "rwelande") # get_group res = client.group.get_group(client.auth.get_token(), "https://auth.cern.ch/auth/realms/cern", "asdoiqwe") # has_member res = client.group.has_member(client.auth.get_token(), "somegroup", "rwelande", "https://auth.cern.ch/auth/realms/cern") # get_members res = client.group.get_members(client.auth.get_token(), "somegroup", "https://auth.cern.ch/auth/realms/cern") # find_groups res = client.group.find_groups(client.auth.get_token(), "rwel")
# list_app_providers res = client.app.list_app_providers(auth.get_token()) # open_in_app resource = Resource(abs_path="/eos/user/r/rwelande/collabora.odt") res = client.app.open_in_app(auth.get_token(), resource)
# list file versions resource = Resource(abs_path="/eos/user/r/rwelande/test.md") res = client.checkpoint.list_file_versions(auth.get_token(), resource) # restore file version res = client.checkpoint.restore_file_version(auth.get_token(), resource, "1722936250.0569fa2f")
The documentation can be generated using sphinx
pip install sphinx
cd docs
make htmlpytest --cov-report term --cov=serc tests/
This project is licensed under the Apache 2.0 License. See the LICENSE file for more details.