Source code for atomos.multiprocessing.atomic

# -*- coding: utf-8 -*-
'''
atomos.multiprocessing.atomic

Atomic primitives multiprocessing.
'''

import types
import ctypes
import multiprocessing
import multiprocessing.managers

import six

import atomos.util as util
import atomos.atomic as atomic

if six.PY3:
    long = int


class _AtomicReference(atomic.AtomicReference):
    '''
    A reference to an object which allows atomic manipulation semantics.

    AtomicReferences are particularlly useful when an object cannot otherwise
    be manipulated atomically.
    '''
    def __init__(self, value=None):
        super(_AtomicReference, self).__init__(value=value)
        self._lock = util.ReadersWriterLockMultiprocessing()

    def __repr__(self):
        return util.repr(__name__, self, self._value)

    def _proxy_value(self):
        return self._value


class AtomicManager(multiprocessing.managers.BaseManager):
    pass


AtomicManager.register('AtomicReference',
                       _AtomicReference,
                       exposed=['get',
                                'set',
                                'get_and_set',
                                'compare_and_set',
                                '_proxy_value',
                                '__repr__'])

# HACK: This is a bit of a hack. Essentially we need to run a manager which
# specifically exposes the multiprocessing version of AtomicReference. If we
# don't do this then memory cannot be shared between processes.
_started = False
if not _started:
    _manager = AtomicManager()
    _manager.start()
    AtomicReference = _manager.AtomicReference
    _started = True


class AtomicCtypesReference(object):
    '''
    A reference to an object which allows atomic manipulation semantics.

    AtomicCtypesReferences are particularlly useful when an object cannot
    otherwise be manipulated atomically.

    This only support ctypes data types.
    https://docs.python.org/3.4/library/ctypes.html#fundamental-data-types
    '''
    def __init__(self, typecode_or_type=None, value=None):
        '''
        Atomic reference

        :param typecode_or_type: The type of object allocated from shared
            memory.
        :param value: The default value.
        '''
        self._typecode_or_type = typecode_or_type
        self._reference = multiprocessing.Value(self._typecode_or_type, value)

    def __repr__(self):
        return util.repr(__name__, self, self._reference)

    def get(self):
        '''
        Returns the value.
        '''
        with self._reference.get_lock():
            return self._reference.value

    def set(self, value):
        '''
        Atomically sets the value to `value`.

        :param value: The value to set.
        '''
        with self._reference.get_lock():
            self._reference.value = value
            return value

    def get_and_set(self, value):
        '''
        Atomically sets the value to `value` and returns the old value.

        :param value: The value to set.
        '''
        with self._reference.get_lock():
            oldval = self._reference.value
            self._reference.value = value
            return oldval

    def compare_and_set(self, expect, update):
        '''
        Atomically sets the value to `update` if the current value is equal to
        `expect`.

        :param expect: The expected current value.
        :param update: The value to set if and only if `expect` equals the
            current value.
        '''
        with self._reference.get_lock():
            if self._reference.value == expect:
                self._reference.value = update
                return True

            return False


[docs]class AtomicBoolean(AtomicCtypesReference): ''' A boolean value whichs allows atomic manipulation semantics. ''' def __init__(self, value=False): super(AtomicBoolean, self).__init__(typecode_or_type=ctypes.c_bool, value=value) # We do not need a locked get since a boolean is not a complex data type.
[docs] def get(self): ''' Returns the value. ''' return self._reference.value
def __setattr__(self, name, value): # Ensure the `value` attribute is always a bool. if name == '_value' and not isinstance(value, types.BooleanType): raise TypeError('_value must be of type bool') super(AtomicBoolean, self).__setattr__(name, value)
class AtomicNumber(AtomicCtypesReference): ''' AtomicNumber object super type. Contains common methods for AtomicInteger, AtomicLong, and AtomicFloat. ''' # We do not need a locked get since numbers are not complex data types. def get(self): ''' Returns the value. ''' return self._reference.value def add_and_get(self, delta): ''' Atomically adds `delta` to the current value. :param delta: The delta to add. ''' with self._reference.get_lock(): self._reference.value += delta return self._reference.value def get_and_add(self, delta): ''' Atomically adds `delta` to the current value and returns the old value. :param delta: The delta to add. ''' with self._reference.get_lock(): oldval = self._reference.value self._reference.value += delta return oldval def subtract_and_get(self, delta): ''' Atomically subtracts `delta` from the current value. :param delta: The delta to subtract. ''' with self._reference.get_lock(): self._reference.value -= delta return self._reference.value def get_and_subtract(self, delta): ''' Atomically subtracts `delta` from the current value and returns the old value. :param delta: The delta to subtract. ''' with self._reference.get_lock(): oldval = self._reference.value self._reference.value -= delta return oldval
[docs]class AtomicInteger(AtomicNumber): ''' An integer value which allows atomic manipulation semantics. ''' def __init__(self, value=0): super(AtomicInteger, self).__init__(typecode_or_type=ctypes.c_int, value=value) def __setattr__(self, name, value): # Ensure the `_value` attribute is always an int. if name == '_value' and not isinstance(value, int): raise TypeError('_value must be of type int') super(AtomicInteger, self).__setattr__(name, value)
[docs]class AtomicLong(AtomicNumber): ''' A long value which allows atomic manipulation semantics. ''' def __init__(self, value=long(0)): super(AtomicLong, self).__init__(typecode_or_type=ctypes.c_long, value=value) def __setattr__(self, name, value): # Ensure the `_value` attribute is always a long. if name == '_value' and not isinstance(value, long): raise TypeError('_value must be of type long') super(AtomicLong, self).__setattr__(name, value)
[docs]class AtomicFloat(AtomicNumber): ''' A float value which allows atomic manipulation semantics. ''' def __init__(self, value=float(0)): super(AtomicFloat, self).__init__(typecode_or_type=ctypes.c_float, value=value) def __setattr__(self, name, value): # Ensure the `_value` attribute is always a float. if name == '_value' and not isinstance(value, float): raise TypeError('_value must be of type float') super(AtomicFloat, self).__setattr__(name, value)