Modules

Worker

Add concurrency to methods and functions in a single line of code.

Thread

lox.worker.thread(max_workers, daemon=None)[source]

Decorator to execute a function in multiple threads.

Example:

>>> import lox
>>>
>>> @lox.thread(4) # Will operate with a maximum of 4 threads
... def foo(x,y):
...     return x*y
>>> foo(3,4)
12
>>> for i in range(5):
...     foo.scatter(i, i+1)
-ignore-
>>> # foo is currently being executed in 4 threads
>>> results = foo.gather()
>>> print(results)
[0, 2, 6, 12, 20]

Multiple decorated functions can be chained together, each function drawing from their own pool of threads. Functions that return tuples will automatically unpack into the chained function. Positional arguments and keyword arguments can be passed in as they normally would.

>>>  for i in range(5):
...    foo_res = foo.scatter(i, i+1)
...    bar.scatter(foo_res, 10) # scatter will automatically unpack the results of foo
>>>
>>> results = bar.gather()

Currently, a scatter call can have a maximum of 1 previous scatter result as an input argument. However, unlimited number of functions can be chained together in any topology.

Parameters:max_workers (int) – Maximum number of threads to invoke. When lox.thread is called without (), the wrapped function a default number of max_workers is used (50).
lox.worker.__call__(*args, **kwargs)

Vanilla passthrough function execution. Default user function behavior.

Returns:Return of decorated function.
Return type:Decorated function return type.
lox.worker.__len__()
Returns:Current job queue length. Number of jobs that are currently waiting for an available worker.
Return type:int
lox.worker.scatter(*args, **kwargs)

Start a job executing decorated function func( *args, **kwargs ). Workers are created and destroyed automatically.

Returns:Solution’s index into the results obtained via gather().
Return type:int
lox.worker.gather()

Block until all jobs called via scatter() are complete.

Returns:Results in the order that scatter was invoked.
Return type:list
lox.worker.disable_auto_unpacking()

Automatically unpack previously chained input tuples.

lox.worker.enable_auto_unpacking()

Do not unpack previously chained input tuples.

Process

lox.worker.process(n_workers)[source]

Decorator to execute a function/method in multiple processes.

Example:

>>> import lox
>>>
>>> @lox.process(4) # Will operate with a maximum of 4 processes
... def foo(x,y):
...     return x*y
>>> foo(3,4)
12
>>> for i in range(5):
...     foo.scatter(i, i+1)
>>> # foo is currently being executed in 4 processes
>>> results = foo.gather()
>>> print(results)
[0, 2, 6, 12, 20]
Parameters:n_workers (int) – Number of process workers to invoke. Defaults to number of CPU cores.
lox.worker.__call__(*args, **kwargs)

Vanilla passthrough function execution. Default user function behavior.

Returns:Return of decorated function.
Return type:Decorated function return type.
lox.worker.__len__()
Returns:job queue length.
Return type:int
lox.worker.scatter(*args, **kwargs)

Start a job executing func( *args, **kwargs ). Workers are created and destroyed automatically.

Returns:Solution’s index into the results obtained via gather().
Return type:int
lox.worker.gather()

Block until all jobs called via scatter() are complete.

Returns:Results in the order that scatter was invoked.
Return type:list

Lock

Concurrency control objects to help parallelized tasks communicate and share resources.

LightSwitch

class lox.lock.LightSwitch(lock, multiprocessing=False)[source]

Acquires a provided lock while LightSwitch is in use.

The lightswitch pattern creates a first-in-last-out synchronization mechanism. The name of the pattern is inspired by people entering a room in the physical world. The first person to enter the room turns on the lights; then, when everyone is leaving, the last person to exit turns the lights off.

lock

The lock provided to the constructor that may be acquired/released by LightSwitch.

Type:threading.Lock
counter

Number of times the LightSwitch has been acquired without release.

Type:int
__enter__()[source]

Acquire LightSwitch at context enter.

__exit__(exc_type, exc_val, exc_tb)[source]

Release LightSwitch at context exit.

__len__()[source]

Get the counter value.

Returns:counter value (number of times lightswitch has been acquired).
Return type:int
acquire(timeout=-1)[source]

Acquire the LightSwitch and increment the internal counter.

When the internal counter is incremented from zero, it will acquire the provided lock.

Parameters:timeout (float) – Maximum number of seconds to wait before aborting.
Returns:True on success, False on failure (like timeout).
Return type:bool
release()[source]

Release the LightSwitch by decrementing the internal counter.

When the internal counter is decremented to zero, it will release the provided lock.

RWLock

class lox.lock.RWLock[source]

Lock for a Multi-Reader-Single-Writer scenario.

Unlimited numbers of reader can obtain the lock, but as soon as a writer attempts to acquire the lock, all reads are blocked until the current readers are finished, the writer acquires the lock, and finally releases it.

Similar to a lox.LightSwitch, but blocks incoming “readers” while a “write” is trying to be performed.

read_counter

Number of readers that have acquired the lock.

Type:int
__len__()[source]

Get the read_counter value

Returns:Number of current readers
Return type:int
acquire(rw_flag: str, timeout=-1)[source]

Acquire the lock as a “reader” or a “writer”.

Parameters:
  • rw_flag (str) – Either ‘r’ for ‘read’ or ‘w’ for ‘write’ acquire.
  • timeout (float) – Time in seconds before timeout occurs for acquiring lock.
Returns:

True if lock was acquired, False otherwise.

Return type:

bool

release(rw_flag: str)[source]

Release acquired lock.

Parameters:rw_flag (str) – Either ‘r’ for ‘read’ or ‘w’ for ‘write’ acquire.

QLock

class lox.lock.QLock[source]

Lock that guarentees FIFO operation. Approximately 6x slower than a normal Lock().

Modified from https://stackoverflow.com/a/19695878

__enter__()[source]

Acquire QLock at context enter.

__exit__(exc_type, exc_val, exc_tb)[source]

Release QLock at context exit.

__len__()[source]
Returns:Number of tasks waiting to acquire.
Return type:int
acquire(timeout=-1)[source]

Block until resource is available.

Threads that call acquire obtain resource FIFO.

Parameters:timeout (float) – Maximum number of seconds to wait before aborting.
Returns:True on successful acquire, False on timeout.
Return type:bool
locked

Whether or not the QLock is acquired

release()[source]

Release exclusive access to resource.

ValueError
Lock released more than it has been acquired.

IndexSemaphore

class lox.lock.IndexSemaphore(val)[source]

BoundedSemaphore-like object where acquires return an index from [0, val).

Example usecase: thread acquiring a GPU.

Example

>>> sem = IndexSemaphore(4)
>>> with sem() as index:
>>>     print("Obtained resource %d" % (index,))
>>>
Obtained resource 0
__len__()[source]
Returns:Current blocked queue size.
Return type:int
acquire(timeout=None)[source]

Blocking acquire resource.

Parameters:timeout (float) – Maximum number of seconds to wait before returning.
Returns:Resource index on successful acquire. None on timeout.
Return type:int
release(index)[source]

Release resource at index.

Parameters:index (int) – Index of resource to release.
Raises:Exception – Resource has been released more times than acquired.

Queue

Announcement

class lox.queue.Announcement(maxsize=0, backlog=None)[source]

Push to many queues with backlog support.

Allows the pushing of data to many threads.

Example:

>>> import lox
>>> ann = lox.Announcement()
>>> foo_q = ann.subscribe()
>>> bar_q = ann.subscribe()
>>>
>>> @lox.thread
... def foo():
...     x = foo_q.get()
...     return x
>>>
>>> @lox.thread
... def bar():
...     x = bar_q.get()
...     return x**2
>>>
>>> ann.put(5)
>>> foo.scatter()
>>> foo_res = foo.gather()
>>> bar.scatter()
>>> bar_res = bar.gather()

The backlog allows future (or potentially race-condition) subscribers to get content put’d before they subscribed. However, the user must be careful of memory consumption.

backlog

Backlog of queued data. None if not used.

Type:deque
__len__()[source]

Get the number of subscribers.

Returns:Number of subcribers.
Return type:int
classmethod clone(ann, q: queue.Queue = None)[source]

Create a new announcement object that shares subscribers and resources with an existing announcement.

Only difference from cloned announcement is a new receive queue is created.

Parameters:
  • ann (lox.Announcement) – Announcement object to clone from
  • q (queue.Queue) – Receiving queue. If None, a new one is created.
Returns:

New Announcement object with copied attributes, but new q

Return type:

Announcement

empty()[source]

Return True if the receive queue is empty, False otherwise. If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

Returns:True if the receive queue is currently empty; False otherwise.
Return type:bool
finalize()[source]

Do not allow any more subscribers.

Primarily used for memory efficiency if backlog is used.

full()[source]

Return True if the receive queue is full, False otherwise. If full() returns True it doesn’t guarantee that a subsequent call to get() will not block. Similarly, if full() returns False it doesn’t guarantee that a subsequent call to put() will not block.

Returns:True if the receive queue is currently full; False otherwise.
Return type:bool
get(block=True, timeout=None)[source]

Get from the receive queue.

Parameters:
  • block (bool) – Block until data is obtained from receive queue or timeout.
  • timeout (float) – Wait up to timeout seconds before raising queue.Full. Defaults to no timeout.
Returns:

Return type:

item from receive queue.

Raises:

queue.Empty – When there are no elements in queue and timeout has been reached.

put(item, block=True, timeout=None)[source]

Put item into all subscribers’ queues.

Parameters:
  • item – data to put onto all subscribers’ queues
  • block (bool) – Block until data is put on queues or timeout.
  • timeout (float) – Wait up to timeout seconds before raising queue.Full. Defaults to no timeout.
qsize()[source]

Return the approximate size of the receive queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block.

Returns:approximate size of the receive queue.
Return type:int
subscribe(q=None, maxsize=None, block=True, timeout=None)[source]

Subscribe to announcements.

Parameters:
  • q (Queue) – Existing queue to add to the subscribers’ list. If not provided, a queue is created.
  • maxsize (int) – Created queue’s maximum size. Overrides Announcement’s default maximum size. Ignored if q is provided.
  • block (bool) – Block until data from backlog is put on queues or timeout.
  • timeout (float) – Wait up to timeout seconds before raising queue.Full. Defaults to no timeout.
Returns:

object for receiver to get and put data from.

Return type:

Announcement

unsubscribe(q)[source]

Remove the queue from queue-list. Will no longer receive announcements.

Parameters:q (Queue) – Queue object to remove.
Raises:ValueErrorq was not a subscriber.

Funnel

class lox.queue.Funnel[source]

Wait on many queues.

>>> funnel = lox.Funnel()
>>> sub_1 = funnel.subscribe()
>>> sub_2 = funnel.subscribe()

>>> sub_1.put('foo', 'job_id')
>>> try:
...     res = funnel.get(timeout=0.01)
... except queue.Empty:
...     print("Timed Out")
Timed Out
>>> sub_2.put('bar', 'job_id')
>>> res = funnel.get()
>>> print(len(res))
3
>>> print(res)
['job_id','foo','bar']
index

Index into list of solutions (if a subscriber). -1 otherwise. Note: if get(return_jid=True) then this is offset by one.

Type:int
__len__()[source]

Gets number of input queues.

Returns:Number of input queues.
Return type:int
get(block=True, timeout=None, return_jid=True)[source]

Get from the receive queue. Will return the contents of each input queue in the order subscribed as a tuple

Parameters:
  • block (bool) – Block until data is obtained from receive queue or timeout.
  • timeout (float) – Wait up to timeout seconds before raising queue.Full. Defaults to no timeout.
  • return_jid (bool) – Have the Job ID as the first element of the returned tuple. Defaults to True
Returns:

items from input queues.

Return type:

tuple

Raises:

queue.Empty – When there are no elements in queue and timeout has been reached.

put(item, jid, blocking=True, timeout=-1)[source]
Parameters:
  • item – data to put onto all subscribers’ queues
  • jid (hashable) – unique identifier for job.
  • block (bool) – Block until data is put on queues or timeout.
  • timeout (float) – Wait up to timeout seconds before raising queue.Full. Defaults to no timeout.
Returns:

True if item was successfully added; False otherwise.

Return type:

bool

Raises:

FunnelPutTopError – Can only put onto subscribers, not the top/master Funnel.

subscribe()[source]

Create a new Funnel for data to be put on.

Returns:A funnel object that is a required input on get calls.
Return type:Funnel