My program (which eventually will be containerised) should be able to do the following:
- continuously listening for deployment and daemonsets in kubernetes (2 threads)
- collect info needed (like annotations)
- use the info collected in order to create objects in a remote database
I could use some peer review for the thread part, which looks like this currently:
#first thread to watch for deployments
def watch_deployments():
v1 = client.AppsV1Api()
w = watch.Watch()
last_seen_version = v1.list_deployment_for_all_namespaces().metadata.resource_version
while True:
for item in w.stream(v1.list_deployment_for_all_namespaces, pretty="pretty", resource_version=last_seen_version):
_ = {item["object"].metadata.name:[item["object"].kind, item["object"].metadata.namespace, item["object"].metadata.resource_version, item["object"].metadata.annotations]}
depl_lst.put(_)
def watch_daemonsets():
v1 = client.AppsV1Api()
w = watch.Watch()
last_seen_version = v1.list_daemon_set_for_all_namespaces().metadata.resource_version
while True:
for item in w.stream(v1.list_daemon_set_for_all_namespaces, pretty="pretty", resource_version=last_seen_version):
_ = {item["object"].metadata.name:[item["object"].kind, item["object"].metadata.namespace, item["object"].metadata.resource_version, item["object"].metadata.annotations]}
depl_lst.put(_)
if __name__ == '__main__':
current_obj = {}
depl_lst = Queue()
thread.start_new_thread(watch_deployments, ())
thread.start_new_thread(watch_daemonsets, ())
while True:
for i in range(depl_lst.qsize()):
current_obj = depl_lst.get()
#add object is the function in order to create the item in the remote database, not listed here
add_object()
depl_lst.task_done()
Its the same thing with what is being done here but in this case, the asyncio was being used: https://medium.com/@sebgoa/kubernets-async-watches-b8fa8a7ebfd4
1 Answer 1
Restructuring and consolidation
Dealing with API client instance
Instead of generating a new client.AppsV1Api
instance in each separate thread - create the instance in the main thread and pass shared API client to your threads constructors:
if __name__ == '__main__':
...
v1 = client.AppsV1Api()
thread.start_new_thread(watch_deployments, (v1,))
thread.start_new_thread(watch_daemonsets, (v1))
...
Both target functions watch_deployments
and watch_daemonsets
perform essentially the same set of actions and differ only in specific v1.list_...
routine.
To eliminate duplication the common behavior is extracted into a separate function say _gather_watched_metadata
that will accept a particular v1.list_...
function object as callable:
def _gather_watched_metadata(list_func):
w = watch.Watch()
last_seen_version = list_func().metadata.resource_version
while True:
for item in w.stream(list_func, pretty="pretty", resource_version=last_seen_version):
metadata = item["object"].metadata
_ = {metadata.name: [item["object"].kind, metadata.namespace,
metadata.resource_version,
metadata.annotations]}
depl_lst.put(_)
As can be seen in the above function, to avoid repetitive indexing of nested attributes like item["object"].metadata
it is worth to extract it into a variable at once.
Now, both target functions become just as below:
def watch_deployments(v1):
_gather_watched_metadata(v1.list_deployment_for_all_namespaces)
def watch_daemonsets(v1):
_gather_watched_metadata(v1.list_daemon_set_for_all_namespaces)
That can be shortened even further: both watch_...
functions can be eliminated and you may run just with _gather_watched_metadata
function and specific API routines passed as an argument :
...
v1 = client.AppsV1Api()
thread.start_new_thread(_gather_watched_metadata, (v1.list_deployment_for_all_namespaces,))
thread.start_new_thread(_gather_watched_metadata, (v1.list_daemon_set_for_all_namespaces,))
But to apply the last "shortening" - is up to you ...
Consuming depl_lst
queue
Initiating for
loop with range
in this context:
while True:
for i in range(depl_lst.qsize()):
...
is redundant as it's enough to check if queue is not empty:
while True:
if not depl_lst.empty():
current_obj = depl_lst.get()