This package provides a client to interact with OSCAR (https://oscar.grycap.net) clusters and services. It is available on Pypi with the name oscar-python.
options_basic_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'user':'username', 'password':'password', 'ssl':'True'} client = Client(options = options_basic_auth)
If you want to use OIDC tokens to authenticate with EGI Check-In, you can use the OIDC Agent to create an account configuration for the EGI issuer (https://aai.egi.eu/auth/realms/egi/) and then initialize the client specifying the shortname of your account like follows.
options_oidc_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'shortname':'oidc-agent-shortname', 'ssl':'True'} client = Client(options = options_oidc_auth)
If you already have a valid token, you can use the parameter oidc_token instead.
options_oidc_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'oidc_token':'token', 'ssl':'True'} client = Client(options = options_oidc_auth)
An example of using a generated token is if you want to use EGI Notebooks. Since you can't use oidc-agent on the Notebook, you can make use of the generated token that EGI provides on path /var/run/secrets/egi.eu/access_token.
If you have a valid refresh token (long live token), you can use the parameter refresh_token instead.
options_oidc_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'refresh_token':'token', 'ssl':'True'} client = Client(options = options_oidc_auth)
You can get a refresh token from EGI Check-In using the Token Portal.
In case of using other OIDC provider you must provide two additional parameters token_endpoint
and scopes:
options_oidc_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'refresh_token':'token', 'scopes': ["openid", "profile", "email"], 'token_endpoint': "http://issuer.com/token", 'client_id': "your_client_id" 'ssl':'True'} client = Client(options = options_oidc_auth)
- Sample code that creates a client and gets information about the cluster
from oscar_python.client import Client options_basic_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'user':'username', 'password':'password', 'ssl':'True'} client = Client(options = options) # get the cluster information try: info = client.get_cluster_info() print(info.text) except Exception as err: print("Failed with: ", err)
- Sample code to create a simple service with the cowsay example and make a synchronous invocation.
from oscar_python.client import Client options_basic_auth = {'cluster_id':'cluster-id', 'endpoint':'https://cluster-endpoint', 'user':'username', 'password':'password', 'ssl':'True'} client = Client(options = options) try: client.create_service("/absolute_path/cowsay.yaml") response = client.run_service("cowsay", input = '{"message": "Hi there"}') if response.status_code == 200: print(response.text) except Exception as err: print("Failed with: ", err)
get_cluster_info
# get the cluster information info = client.get_cluster_info() # returns an HTTP response or an HTTPError
get_cluster_config
# get the cluster config config = client.get_cluster_config() # returns an http response or an HTTPError
get_service
# get the definition of a service service = client.get_service("service_name") # returns an http response or an HTTPError
list_services
# get a list of all the services deployed services = client.list_services() # returns an http response or an HTTPError
Note : Both
path_to_fdland the script path inside the fdl must be absolute.
create_service
# create a service err = client.create_service("path_to_fdl" | "JSON_definition") # returns nothing if the service is created or an error if something goes wrong
update_service
# update a service err = client.update_service("service_name","path_to_fdl" | "JSON_definition") # returns nothing if the service is created or an error if something goes wrong
remove_service
# remove a service response = client.remove_service("service_name") # returns an http response
run_service
input, output and timeout are optional parameters.
# make a synchronous execution response = client.run_service("service_name", input="input", output="out.png", timeout=100) # returns an http response # make an asynchronous execution response = client.run_service("service_name", input="input", async_call=True) # returns an http response
get_job_logs
# get logs of a job logs = client.get_job_logs("service_name", "job_id") # returns an http response
list_jobs
# get a list of jobs in a service log_list = client.list_jobs("service_name") # returns an http response # to get more jobs use the page parameter log_list = client.list_jobs("service_name",page="token_to_next_page") # returns an http response
remove_job
# remove a job of a service response = client.remove_job("service_name", "job_id") # returns an http response
remove_all_jobs
# remove all jobs in a service response = client.remove_all_jobs("service_name") # returns an http response
You can create a storage object to operate over the different storage providers defined on a service with the method create_storage_client. This constructor returns a storage object with methos to interact with the storage providers.
The default constructor, seen as follows, will create a provider to interact with the default MinIO instance through the user's credentials.
storage_service = client.create_storage_client() # returns a storage object
Additionally, if you need to interact with specific storage providers defined on a service, the constructor accepts a svc parameter where you can state the service name from which to search for additional credentials.
storage_service = client.create_storage_client("service_name") # returns a storage object
Note : The
storage_providerparameter on the storage methods follows the format:["storage_provider_type"].["storage_provider_name"]wherestorage_provider_typeis one of the suported storage providers (minIO, S3, Onedata or webdav) andstorage_provider_nameis the identifier (ex: minio.default)
list_files_from_path
This method returns a JSON with the info except for OneData, which returns an HTTP response.
# get a list of the files of one of the service storage provider files = storage_service.list_files_from_path("storage_provider", "remote_path") # returns json
upload_file
# upload a file from a local path to a remote path response = storage_service.upload_file("storage_provider", "local_path", "remote_path")
download_file
# download a file from a remote path to a local path response = storage_service.download_file("storage_provider", "local_path", "remote_path")