4
\$\begingroup\$

I have a requirement to essentially scan a filesystem on any kind of filesystem and move a resource whenever it is ready. To do this I have the following code

from dataclasses import dataclass
from datetime import datetime
from enum import Enum, auto
from typing import Optional, Callable, List, Any, Dict
from os.path import split
from watchdog.events import FileSystemEventHandler, FileSystemEvent as FSEvent
from abc import ABC
from asyncio import AbstractEventLoop, sleep
from asyncpool import AsyncPool
class FileSystemResourceType(Enum):
 UNKNOWN = auto()
 DIRECTORY = auto()
 FILE = auto()
 CHARACTER = auto()
 BLOCK_SPECIAL_FILE = auto()
 FIFO = auto()
 SOCKET = auto()
 SYMLINK = auto()
@dataclass
class FileSystemResource:
 """Represents some object/resource on some filesystem.
 Depending on the filesystem, any datetime value may be None as some filesystems do not provide anyway to retrieve
 these values
 Args:
 path: The path to the resource
 name: The name of the resource (eg /path/to/file.png, name=file.png)
 is_dir: Whether the resource is a directory
 size: The size of the resource in bytes
 type: The type of resource
 accessed: The last time the file was accessed.
 """
 path: str
 name: str
 is_dir: bool
 size: int
 type: FileSystemResourceType
 accessed: Optional[datetime] = None
 created: Optional[datetime] = None
 metadata_changed: Optional[datetime] = None
 modified: Optional[datetime] = None
 @property
 def parent(self) -> str:
 return split(self.path)[0]
 def __hash__(self) -> int:
 return hash(self.path)
@dataclass(frozen=True) # Make it readonly
class FileSystemEvent:
 """Represents an event occuring on a filesystem
 Args:
 event_type: The kind of event occuring to a resource
 file_system: The filesystem that the resource is found on
 path: The path to the resource, relative to the filesystem. See IFileSystem for relativity information
 resource: The resource at the time of the event. Some events imply that there is no longer a resource (eg.
 DELETED). This is only populated if the resource exists at the time of the event.
 """
 event_type: FilesystemEventType
 file_system: IFileSystem # Abstract file system
 path: str
 resource: Optional[FileSystemResource]
class IFileSystemEventProducer(ABC):
 """Responsible for watching a single directory and producing events."""
 @abstractmethod
 def subscribe(
 self,
 on_event_cb: Callable[[FileSystemEvent], Any],
 ):
 """Subscribes to receive directory events from this directory watcher. Return of callback is not used.
 Args:
 on_event_cb: The callback to use when an event is received
 """
class LocalPollingFileSystemEventProducer(IFileSystemEventProducer):
 """An event producer using watchdog polling to make it work on samba shares.
 Please note that each watching location is run on its own thread, so callbacks will occur in that thread.
 """
 class CallbackEventHandler(FileSystemEventHandler):
 def __init__(
 self,
 base_path: str,
 filesystem: IFileSystem,
 logger: ILogger,
 ):
 self._event_callbacks: List[Callable[[FileSystemEvent], Any]] = list()
 self._base_path = base_path
 self._fs = filesystem
 self._logger = logger
 self._event_queue: Queue[FileSystemEvent] = Queue()
 def add_callback(self, on_event_cb: Callable[[FileSystemEvent], Any]):
 self._event_callbacks.append(on_event_cb)
 def on_any_event(self, event: FSEvent):
 if (
 event.src_path == self._base_path
 ): # Ignore events on the basepath. Unsure why these are even given.
 return
 try:
 # Pylance associates event.event_type with None as internally it is instantiated as event_type=None
 # It should be a string.
 event_type = FilesystemEventType[event.event_type.upper()] # type: ignore
 except KeyError:
 self._logger.warning(
 f"Directory Watcher returned unknown event: {event.event_type}"
 )
 return
 # This doesn't really work as we don't know if
 # it should be relative, not do we know where the filesystem is. But because I can't use the filesystem to
 # produce the event in the first place this is the hack we have to use.
 relative_path = event.src_path.replace(self._base_path, "").replace(
 "\\", "/"
 )
 info = None
 if self._fs.exists(relative_path):
 info = self._fs.stat(relative_path)
 directory_event = FileSystemEvent(event_type, self._fs, relative_path, info)
 for cb in self._event_callbacks:
 cb(directory_event)
 def __init__(
 self,
 path: str,
 file_system: IFileSystem,
 logger: ILogger,
 ):
 self._observer = PollingObserver()
 self._event_handler = self.CallbackEventHandler(path, file_system, logger)
 self._observer.schedule(self._event_handler, path)
 self._observer.start()
 async def stop(self):
 self._observer.stop()
 self._observer.join()
 def subscribe(
 self,
 on_event_cb: Callable[[FileSystemEvent], Any],
 ):
 self._event_handler.add_callback(on_event_cb)
class IResourceStableTrackerEventProducer(ABC):
 """If you are not Me reading this, yes I know the name sucks. Its a hard thing to name. This interface has 1
 job, take in a resource to track and determine if it is being changed. Essentially, we need something to tell us
 when a given file system resource is not longer being changed so that it can be worked with.
 TODO: Question whether being an event producer itself is a violation of SRP... Seems so overboard though.
 """
 @abstractmethod
 def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
 """Tracks a resource and determines when its no longer being altered.
 Args:
 resource: The resource to track
 fs: The filesystem that the resource is on.
 Raises:
 NotFound: If the resource does not exist on the filesystem
 """
 @abstractmethod
 def cancel_track(self, path: str, fs: IFileSystem):
 """Cancels a currently running tracking of a resource
 Args:
 path: The path to the resource on the filesystem. There is no resource if the resource has been deleted.
 fs: The fs that the resource exists on.
 """
 @abstractmethod
 def subscribe(
 self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
 ):
 """Subscribes the callback to receive events about files that are ready.
 Args:
 resource_stable_cb: The callback
 """
class ResourceStableTracker(IResourceStableTrackerEventProducer):
 """Purpose is to determine when a path is ready to be copied.
 I'm not a huge fan of how everything is so implicit, nor am I a fan of how we use the poll time.
 """
 def __init__(
 self,
 hash_service: IPathHasher,
 poll_time: float,
 loop: AbstractEventLoop,
 logger: ILogger,
 ):
 self._loop = loop
 self._hash_service = hash_service
 self._poll_time = poll_time
 self._logger = logger
 self._worker_pool = AsyncPool(
 loop, 10, "FileTracker", logger, self._resource_countdown
 )
 self._worker_pool.start()
 self._file_ready_cbs: List[
 Callable[[FileSystemResource, IFileSystem], None]
 ] = list()
 self._path_countdowns: Dict[str, Future] = dict()
 def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
 self._loop.run_until_complete(self._add_to_pool(resource, fs))
 def cancel_track(self, path: str, fs: IFileSystem):
 if path in self._path_countdowns:
 countdown_task = self._path_countdowns[path]
 countdown_task.cancel()
 try:
 self._loop.run_until_complete(countdown_task)
 except CancelledError:
 self._logger.info(f"Successfully stopped countdown on {path}")
 self._remove_from_countdown(path)
 def subscribe(
 self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
 ):
 self._file_ready_cbs.append(resource_stable_cb)
 async def _add_to_pool(self, resource: FileSystemResource, fs: IFileSystem):
 if resource.path not in self._path_countdowns:
 fut = await self._worker_pool.push(resource, fs)
 self._path_countdowns[resource.path] = fut
 async def _hash_resource(
 self, resource: FileSystemResource, fs: IFileSystem
 ) -> bytes:
 try:
 return self._hash_service.hash(resource.path, fs)
 except FileLocked:
 # If its locked, it can't be ready for transfer. We will return a unique ID to convey this.
 return str(uuid4()).encode()
 def _remove_from_countdown(self, path: str):
 self._path_countdowns.pop(path)
 async def _resource_countdown(self, resource: FileSystemResource, fs: IFileSystem):
 try:
 last_hash = await self._hash_resource(resource, fs)
 while True:
 await sleep(self._poll_time)
 current_hash = await self._hash_resource(resource, fs)
 if last_hash == current_hash and last_hash != "":
 self._logger.info(
 f"{str(resource.path)} completely copied into directory"
 )
 updated_resource = fs.stat(resource.path)
 for cb in self._file_ready_cbs:
 cb(updated_resource, fs)
 return
 self._logger.debug(f"{str(resource.path)} not ready")
 last_hash = current_hash
 finally:
 self._remove_from_countdown(resource.path)
class FileSystemEventResourceTrackerDecorator(IResourceStableTrackerEventProducer):
 def __init__(
 self,
 decoratee: IResourceStableTrackerEventProducer,
 ):
 self._decoratee = decoratee
 def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
 self._decoratee.track_resource(resource, fs)
 def cancel_track(self, path: str, fs: IFileSystem):
 self._decoratee.cancel_track(path, fs)
 def subscribe(
 self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
 ):
 self._decoratee.subscribe(resource_stable_cb)
 def handle_fs_event(self, event: FileSystemEvent):
 if event.event_type == FilesystemEventType.CLOSED:
 return
 if event.event_type in (
 FilesystemEventType.DELETED,
 FilesystemEventType.MOVED,
 ):
 self.cancel_track(event.path, event.file_system)
 return
 if not event.file_system.exists(
 event.path
 ): # Sometimes modified events occur on deletion. Likely if its a large file that takes multiple poll times to
 # delete
 return
 if event.resource:
 self.track_resource(event.resource, event.file_system)

These are used as follows in the composition root

logger = StandardLogger(logging.getLogger())
fs = PyFileSystemAdapter(lambda url: open_fs(url, create=True, writable=True) # Converts the pyfilesystem2 fs to local code
fs_event_producer = LocalPollingFileSystemEventProducer(config.base_path, fs, logger)
resource_tracker = FileSystemEventResourceTrackerDecorator(
 ResourceStableTracker(PartialPathHasher(), config.poll_time, asyncio.get_event_loop(), logger)
)
fs_event_producer.subscribe(resource_tracker.handle_fs_event)

I have been told before to add all relevent code, even if its lengthy, so I have done so.

My own notes about this code:

  • I think the LocalFileSystemEventProducer is kind of average. I think it would be a MAJOR improvement to inject an Observer object into the class, but when I went about this I found the Watchdog code hard to extend (notably I didn't try for long as its not a pressing matter as of this moment)
  • I like the abstraction of the IResourceStableTrackerEventProducer (the name is a complete bruh), but I am wondering if it does break the SRP principle.
  • I am entirely unsure if the adapter/decorator in FileSystemEventResourceTrackerDecorator is a good idea to make it usable with FileSystemEvents. I do need it to be usable with them, but maybe the abstraction should be elsewhere.
  • Overall, the idea of using the events to determine when a file is ready does not work in pretty much all scenarios. This is because 1) FileSystemEvents are different between filesystems. 2) The FileSystemEventProducer interface makes no guarantee its the file system making the event IE if its polling, 3) waiting for a hash or lock to be freed just seems better.
  • I read the excellent advice in https://stackoverflow.com/questions/9892137/windsor-pulling-transient-objects-from-the-container, that logging could be in a decorator. But how do you log things that don't run through the interfaces or are an implementation detail - see self._logger.debug(f"{str(resource.path)} not ready") in the ResourceStableTracker. The rest can be changed to follow that link
  • Im thinking to add a start and stop function to most classes that may need them like the event producer so they dont produce events before they are ready which will be called from the composition root as needed. I am thinking its a bad idea to add it to the interface as it gives the idea that anything can call it when really, only the composition root should call the start and stop functions when everything is wired together, and start should ONLY be responsible for starting its own class. Is this commonly a good idea?

I have 2 more questions which are a bit higher level

  1. the FileSystemEventProducer is likely to be a multithreaded, but the ResourceStableTracker (and many other users of the interface) may not be threadsafe. Is there a way to handle this problem without a leaky abstraction? At this moment in time, the ResourceStableTracker is not threadsafe, but it doesn't even know about threads at all.
  2. Some code uses asyncio as a implementation detail (like the ResourceStableTracker), but the interface doesn't expose any async functions. As far as I can tell, interfaces should not tell you about it being asynchronous or multithreaded in general, yet it can be difficult to work around these things without the knowledge. So somewhat the same question as 1. but for asyncio, how can I deal with a class that wants to await a task or is doing some IO but doesn't necessarily want to create a task?

E: I will add solutions I come up with at the bottom until each has its own solution. Then I will make my own answer if there isn't already an accepted one.

For Q2 at the bottom, I have simply wrapped the async function in a MultithreadedToAsyncAdapter which intercepts the callback and calls loop.call_soon_threadsafe. This is injected at the composition root meaning it is ok to have knowledge of the fact that the implementation will use threads. This appears to be a very elegant solution that should be reusable. This looks something like

class MultithreadedToAsyncAdapter:
 def __init__(self, loop: AbstractEventLoop, cb: Callable[[...], Coroutine[...]]):
 self._loop = loop
 self._cb = cb
 def cb_adapter(self, cb_params: ...): # Whatever the cb needs to be.
 asyncio.run_coroutine_threadsafe(self._cb(cb_params), self._loop)

As far as I can see, the handle produced from this only exposes a cancel function, so it may be a good idea to use a queue of some sort to ensure any running tasks are finished or cancelled on shutdown.

asked Jul 18, 2022 at 5:25
\$\endgroup\$
4
  • \$\begingroup\$ Please, add imports to your code \$\endgroup\$ Commented Jul 18, 2022 at 7:21
  • \$\begingroup\$ The whole module reads very Java-esque. Especially due to the I-prefixed ABCs which only provide one method (why not just use a plain function?). Also some of the names, like FileSystemEventResourceTrackerDecorator are very verbose. \$\endgroup\$ Commented Jul 18, 2022 at 10:03
  • 1
    \$\begingroup\$ @RichardNeumann I am purposefully trying to apply the dependency injection pattern as best I can. It has quite the learning curve. In DI single function interfaces don't seem to be frowned upon, but for Python I would probably agree that its overboard. As for naming, I agree and would love any suggestion to the name :p \$\endgroup\$ Commented Jul 18, 2022 at 22:17
  • \$\begingroup\$ @MiguelAlorda I have added the imports, there may be some I have missed while compiling across files though. Additionally, I have not included all code required to run the example (namely the filesystem implementation). \$\endgroup\$ Commented Jul 18, 2022 at 23:12

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.