"""
ReadWriteLock
Taken from http://code.activestate.com/recipes/502283/
locks.py - Read-Write lock thread lock implementation
See the class documentation for more info.
Copyright (C) 2007, Heiko Wundram.
Released under the BSD-license.
"""
# Imports
# -------
from threading import Condition, Lock, current_thread
from time import time
# Read write lock
# ---------------
[docs]
class ReadWriteLock:
"""Read-Write lock class. A read-write lock differs from a standard
threading.RLock() by allowing multiple threads to simultaneously hold a
read lock, while allowing only a single thread to hold a write lock at the
same point of time.
When a read lock is requested while a write lock is held, the reader
is blocked; when a write lock is requested while another write lock is
held or there are read locks, the writer is blocked.
Writers are always preferred by this implementation: if there are blocked
threads waiting for a write lock, current readers may request more read
locks (which they eventually should free, as they starve the waiting
writers otherwise), but a new thread requesting a read lock will not
be granted one, and block. This might mean starvation for readers if
two writer threads interweave their calls to acquire_write() without
leaving a window only for readers.
In case a current reader requests a write lock, this can and will be
satisfied without giving up the read locks first, but, only one thread
may perform this kind of lock upgrade, as a deadlock would otherwise
occur. After the write lock has been granted, the thread will hold a
full write lock, and not be downgraded after the upgrading call to
acquire_write() has been match by a corresponding release().
"""
def __init__(self):
"""Initialize this read-write lock."""
# Condition variable, used to signal waiters of a change in object
# state.
self.__condition = Condition(Lock())
# Initialize with no writers.
self.__writer = None
self.__upgradewritercount = 0
self.__pendingwriters = []
# Initialize with no readers.
self.__readers = {}
[docs]
def acquire_read(self, *, timeout=None):
"""Acquire a read lock for the current thread, waiting at most
timeout seconds or doing a non-blocking check in case timeout is <= 0.
In case timeout is None, the call to acquire_read blocks until the
lock request can be serviced.
In case the timeout expires before the lock could be serviced, a
RuntimeError is thrown."""
if timeout is not None:
endtime = time() + timeout
me = current_thread()
self.__condition.acquire()
try:
if self.__writer is me:
# If we are the writer, grant a new read lock, always.
self.__writercount += 1
return
while True:
if self.__writer is None:
# Only test anything if there is no current writer.
if self.__upgradewritercount or self.__pendingwriters:
if me in self.__readers:
# Only grant a read lock if we already have one
# in case writers are waiting for their turn.
# This means that writers can't easily get starved
# (but see below, readers can).
self.__readers[me] += 1
return
# No, we aren't a reader (yet), wait for our turn.
else:
# Grant a new read lock, always, in case there are
# no pending writers (and no writer).
self.__readers[me] = self.__readers.get(me, 0) + 1
return
if timeout is not None:
remaining = endtime - time()
if remaining <= 0:
# Timeout has expired, signal caller of this.
raise RuntimeError("Acquiring read lock timed out")
self.__condition.wait(remaining)
else:
self.__condition.wait()
finally:
self.__condition.release()
[docs]
def acquire_write(self, *, timeout=None):
"""Acquire a write lock for the current thread, waiting at most
timeout seconds or doing a non-blocking check in case timeout is <= 0.
In case the write lock cannot be serviced due to the deadlock
condition mentioned above, a ValueError is raised.
In case timeout is None, the call to acquire_write blocks until the
lock request can be serviced.
In case the timeout expires before the lock could be serviced, a
RuntimeError is thrown."""
if timeout is not None:
endtime = time() + timeout
me, upgradewriter = current_thread(), False
self.__condition.acquire()
try:
if self.__writer is me:
# If we are the writer, grant a new write lock, always.
self.__writercount += 1
return
elif me in self.__readers:
# If we are a reader, no need to add us to pendingwriters,
# we get the upgradewriter slot.
if self.__upgradewritercount:
# If we are a reader and want to upgrade, and someone
# else also wants to upgrade, there is no way we can do
# this except if one of us releases all his read locks.
# Signal this to user.
raise ValueError("Inevitable dead lock, denying write lock")
upgradewriter = True
self.__upgradewritercount = self.__readers.pop(me)
else:
# We aren't a reader, so add us to the pending writers queue
# for synchronization with the readers.
self.__pendingwriters.append(me)
while True:
if not self.__readers and self.__writer is None:
# Only test anything if there are no readers and writers.
if self.__upgradewritercount:
if upgradewriter:
# There is a writer to upgrade, and it's us. Take
# the write lock.
self.__writer = me
self.__writercount = self.__upgradewritercount + 1
self.__upgradewritercount = 0
return
# There is a writer to upgrade, but it's not us.
# Always leave the upgrade writer the advance slot,
# because he presumes he'll get a write lock directly
# from a previously held read lock.
elif self.__pendingwriters[0] is me:
# If there are no readers and writers, it's always
# fine for us to take the writer slot, removing us
# from the pending writers queue.
# This might mean starvation for readers, though.
self.__writer = me
self.__writercount = 1
self.__pendingwriters = self.__pendingwriters[1:]
return
if timeout is not None:
remaining = endtime - time()
if remaining <= 0:
# Timeout has expired, signal caller of this.
if upgradewriter:
# Put us back on the reader queue. No need to
# signal anyone of this change, because no other
# writer could've taken our spot before we got
# here (because of remaining readers), as the test
# for proper conditions is at the start of the
# loop, not at the end.
self.__readers[me] = self.__upgradewritercount
self.__upgradewritercount = 0
else:
# We were a simple pending writer, just remove us
# from the FIFO list.
self.__pendingwriters.remove(me)
raise RuntimeError("Acquiring write lock timed out")
self.__condition.wait(remaining)
else:
self.__condition.wait()
finally:
self.__condition.release()
[docs]
def release(self):
"""Release the currently held lock.
In case the current thread holds no lock, a ValueError is thrown."""
me = current_thread()
self.__condition.acquire()
try:
if self.__writer is me:
# We are the writer, take one nesting depth away.
self.__writercount -= 1
if not self.__writercount:
# No more write locks; take our writer position away and
# notify waiters of the new circumstances.
self.__writer = None
self.__condition.notify_all()
elif me in self.__readers:
# We are a reader currently, take one nesting depth away.
self.__readers[me] -= 1
if not self.__readers[me]:
# No more read locks, take our reader position away.
del self.__readers[me]
if not self.__readers:
# No more readers, notify waiters of the new
# circumstances.
self.__condition.notify_all()
else:
raise ValueError("Trying to release unheld lock")
finally:
self.__condition.release()