1
\$\begingroup\$

My program (which eventually will be containerised) should be able to do the following:

  • continuously listening for deployment and daemonsets in kubernetes (2 threads)
  • collect info needed (like annotations)
  • use the info collected in order to create objects in a remote database

I could use some peer review for the thread part, which looks like this currently:

#first thread to watch for deployments
def watch_deployments():
 v1 = client.AppsV1Api()
 w = watch.Watch()
 last_seen_version = v1.list_deployment_for_all_namespaces().metadata.resource_version
 while True:
 for item in w.stream(v1.list_deployment_for_all_namespaces, pretty="pretty", resource_version=last_seen_version):
 _ = {item["object"].metadata.name:[item["object"].kind, item["object"].metadata.namespace, item["object"].metadata.resource_version, item["object"].metadata.annotations]}
 depl_lst.put(_)
def watch_daemonsets():
 v1 = client.AppsV1Api()
 w = watch.Watch()
 last_seen_version = v1.list_daemon_set_for_all_namespaces().metadata.resource_version
 while True:
 for item in w.stream(v1.list_daemon_set_for_all_namespaces, pretty="pretty", resource_version=last_seen_version):
 _ = {item["object"].metadata.name:[item["object"].kind, item["object"].metadata.namespace, item["object"].metadata.resource_version, item["object"].metadata.annotations]}
 depl_lst.put(_)
if __name__ == '__main__':
 current_obj = {}
 depl_lst = Queue()
 thread.start_new_thread(watch_deployments, ())
 thread.start_new_thread(watch_daemonsets, ())
 while True:
 for i in range(depl_lst.qsize()):
 current_obj = depl_lst.get()
 #add object is the function in order to create the item in the remote database, not listed here
 add_object()
 depl_lst.task_done()

Its the same thing with what is being done here but in this case, the asyncio was being used: https://medium.com/@sebgoa/kubernets-async-watches-b8fa8a7ebfd4

Graipher
41.6k7 gold badges70 silver badges134 bronze badges
asked Dec 5, 2019 at 12:16
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

Restructuring and consolidation

Dealing with API client instance
Instead of generating a new client.AppsV1Api instance in each separate thread - create the instance in the main thread and pass shared API client to your threads constructors:

if __name__ == '__main__':
 ...
 v1 = client.AppsV1Api()
 thread.start_new_thread(watch_deployments, (v1,))
 thread.start_new_thread(watch_daemonsets, (v1))
 ...

Both target functions watch_deployments and watch_daemonsets perform essentially the same set of actions and differ only in specific v1.list_... routine.
To eliminate duplication the common behavior is extracted into a separate function say _gather_watched_metadata that will accept a particular v1.list_... function object as callable:

def _gather_watched_metadata(list_func):
 w = watch.Watch()
 last_seen_version = list_func().metadata.resource_version
 while True:
 for item in w.stream(list_func, pretty="pretty", resource_version=last_seen_version):
 metadata = item["object"].metadata
 _ = {metadata.name: [item["object"].kind, metadata.namespace,
 metadata.resource_version,
 metadata.annotations]}
 depl_lst.put(_)

As can be seen in the above function, to avoid repetitive indexing of nested attributes like item["object"].metadata it is worth to extract it into a variable at once.

Now, both target functions become just as below:

def watch_deployments(v1):
 _gather_watched_metadata(v1.list_deployment_for_all_namespaces)
def watch_daemonsets(v1):
 _gather_watched_metadata(v1.list_daemon_set_for_all_namespaces)

That can be shortened even further: both watch_... functions can be eliminated and you may run just with _gather_watched_metadata function and specific API routines passed as an argument :

...
v1 = client.AppsV1Api()
thread.start_new_thread(_gather_watched_metadata, (v1.list_deployment_for_all_namespaces,))
thread.start_new_thread(_gather_watched_metadata, (v1.list_daemon_set_for_all_namespaces,))

But to apply the last "shortening" - is up to you ...


Consuming depl_lst queue
Initiating for loop with range in this context:

while True:
 for i in range(depl_lst.qsize()):
 ...

is redundant as it's enough to check if queue is not empty:

while True:
 if not depl_lst.empty():
 current_obj = depl_lst.get()
answered Dec 5, 2019 at 13:40
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.