# -*- coding: UTF-8 -*-
"""Common tools."""
import os
import re
from multiprocessing import Process, Queue
from itertools import islice
import logging
[docs]def ifd(d, v1, v2, v3):
"""
Conditionally select a value depending on the given dimension number.
Args:
d (int): Dimension number (1, 2, or 3).
v1: The value if d = 1.
v2: The value if d = 2.
v3: The value if d = 3.
Returns:
v1, v2 or v3.
"""
if d == 1:
return v1
elif d == 2:
return v2
elif d == 3:
return v3
else:
raise ValueError("Invalid dimension: %s." % d)
[docs]def human_order_key(text):
"""
Key function to sort in human order.
"""
# This is based in http://nedbatchelder.com/blog/200712/human_sorting.html
return [int(c) if c.isdigit() else c for c in re.split('(\d+)', text)]
[docs]def ensure_dir_exists(path):
"""
Ensure a directory exists, creating it if needed.
Args:
path (str): The path to the directory.
Raises:
OSError: An error occurred when creating the directory.
"""
if path: # Empty dir (cwd) always exists
try:
# Will fail either if exists or unable to create it
os.makedirs(path)
except OSError:
if os.path.exists(path):
# Directory did [probably] exist
pass
else:
# There was an error on creation, so make sure we know about it
raise OSError("Unable to create directory " + path)
[docs]def get_dir_size(dir_path):
"""Get the size of a directory in bytes."""
total_size = 0
for root, dirs, files in os.walk(dir_path):
for f in files:
total_size += os.path.getsize(os.path.join(root, f))
return total_size
[docs]def ensure_executable(path, all_users=None):
"""
Ensure a file is executable.
Args:
path (str): the path to the file.
all_users (bool): whether it should be make executable for the user or for all users.
"""
st = os.stat(path)
os.chmod(path, st.st_mode | (0o111 if all_users else 0o100))
[docs]class Call:
"""Objectization of a call to be made. When an instance is called, such a call will be made."""
[docs] def __init__(self, fn, *args, **kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
[docs] def __call__(self):
return self.fn(*self.args, **self.kwargs)
def _caller(q):
"""Execute calls from a Queue until its value is 'END'."""
os.setpgrp() # To avoid inheriting SIGINT
while True:
data = q.get()
if data == "END":
break
else:
data() # Call the supplied argument (lambda o Call instance)
[docs]class MPCaller:
"""
MultiProcessing Caller. Makes calls using multiple subprocesses.
Attributes:
processes (list of multiprocessing.Process): Processes managed by the instance.
"""
[docs] def __init__(self, num_threads=2):
self._queue = Queue()
self.processes = []
self.spawn_threads(num_threads)
self._ends_in_queue = 0
[docs] def __repr__(self):
return "MPCaller<%d threads, %d tasks in _queue>" % (
len(self.processes), self._queue.qsize() - self._ends_in_queue)
[docs] def spawn_threads(self, num_threads):
"""Create the required number of processes and add them to the caller.
This does not remove previously created processes.
"""
for _ in range(num_threads):
t = Process(target=_caller, args=(self._queue,))
t.daemon = True
t.start()
self.processes.append(t)
[docs] def add_call(self, call):
"""
Add a call to the instance's stack.
Args:
call (Callable): A function whose call method will be invoked by the processes. Consider using lambda
functions or a :class:`Call` instance.
"""
self._queue.put(call)
[docs] def wait_calls(self, blocking=True, respawn=False):
"""
Ask all processes to consume the queue and stop after that.
Args:
blocking (bool): Whether to block the call, waiting for processes termination.
respawn (bool): If blocking is True, this indicates whether to respawn the threads after the calls finish.
If blocking is not True this is ignored (no automatic respawn if non-blocking).
"""
num_threads = len(self.processes)
for _ in range(num_threads):
self._queue.put("END")
self._ends_in_queue += 1
if blocking:
for t in self.processes:
t.join()
self.processes = []
self._ends_in_queue = 0
if respawn:
self.spawn_threads(num_threads)
[docs] def abort(self, interrupt=False):
"""
Remove all queued calls and ask processes to stop.
Args:
interrupt: If True, terminate all processes.
"""
for _ in range(len(self.processes) + 1):
self._queue.put("END")
self._ends_in_queue += 1
while True:
# Consume all calls in queue
data = self._queue.get()
if data == "END":
# If we consumed an END (we probably will), put it back
self._queue.put("END")
break
# Otherwise do nothing
if interrupt:
for t in self.processes:
t.terminate()
# If the killed process was trying to use the Queue it could have corrupted
# Just in case, create a new one
self._queue = Queue()
self._ends_in_queue = 0
[docs]def head(path, lines=10):
"""
Get the first lines of a file.
Args:
path (str): Path to the file to read.
lines (int): Number of lines to read.
Returns:
:obj:`list` of :obj:`str`: The lines found.
"""
with open(path) as f:
return list(islice(f, lines))
[docs]def tail(path, lines=1, _step=4098):
"""
Get the last lines of a file.
Args:
path (str): Path to the file to read.
lines (int): Number of lines to read.
_step (int): Size of the step used in the search.
Returns:
:obj:`list` of :obj:`str`: The lines found.
"""
# Adapted from glenbot's answer to:
# http://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail
f = open(path, "r")
lines_found = []
block_counter = -1
while len(lines_found) < lines:
try:
f.seek(block_counter * _step, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
# read all and exit loop
f.seek(0)
lines_found = f.readlines()
break
lines_found = f.readlines()
if len(lines_found) > lines:
break
block_counter -= 1
return lines_found[-lines:]
logging.basicConfig(level=logging.INFO)
logger = logging.Logger("duat")