This commit is contained in:
2022-10-22 11:01:52 +08:00
commit 200117b921
105 changed files with 26944 additions and 0 deletions

View File

@ -0,0 +1,5 @@
__version__ = '0.0.8'
from .validator import Validator, create_validator
from .fields import *
from .exceptions import *

View File

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import six
from .utils import force_text, force_str
from .translation import gettext as _
def _flat_error_detail(detail):
if isinstance(detail, list):
return [_flat_error_detail(item) for item in detail]
elif isinstance(detail, dict):
return {
key: _flat_error_detail(value)
for key, value in six.iteritems(detail)
}
else:
return force_text(detail)
class BaseValidationError(Exception):
default_detail = _('Base validation error')
default_code = _('error')
def __init__(self, detail=None, code=None):
"""
:param detail: `detail` maybe a string, a dict or a list.
:param code: error code, it not used for now.
"""
if detail is None:
detail = self.default_detail
if code is None:
code = self.default_code
self.detail = _flat_error_detail(detail)
self.code = code
def get_detail(self):
return self.detail
def __str__(self):
return force_str(self.detail)
def __unicode__(self):
return force_text(self.detail)
def __repr__(self):
detail = self.detail
if len(detail) > 103:
detail = detail[:100] + '...'
return '{0}(detail={1!r})'.format(self.__class__.__name__, detail)
class FieldRequiredError(BaseValidationError):
default_detail = _('Field is required')
default_code = _('error')
class ValidationError(BaseValidationError):
default_detail = _('Validation error')
default_code = _('error')
class FieldValidationError(BaseValidationError):
default_detail = _('field Validation error')
default_code = _('error')

781
spider/validator/fields.py Normal file
View File

@ -0,0 +1,781 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import six
import random
import string
import sys
import uuid
import re
import copy
import datetime
from collections import OrderedDict
from six.moves import urllib_parse as urlparse, range
from IPy import IP, MAX_IPV4_ADDRESS, MAX_IPV6_ADDRESS
from . import exceptions
from .utils import force_text
from .translation import gettext as _
__all__ = [
# Don't need to add field to here by hand,
# BaseFieldMetaClass will auto add field to here.
]
FIELDS_NAME_MAP = {
# Don't need to add field to here by hand,
# BaseFieldMetaClass will auto add field to here.
}
def create_field(field_info):
"""
Create a field by field info dict.
"""
field_type = field_info.get('type')
if field_type not in FIELDS_NAME_MAP:
raise ValueError(_('not support this field: {}').format(field_type))
field_class = FIELDS_NAME_MAP.get(field_type)
params = dict(field_info)
params.pop('type')
return field_class.from_dict(params)
class EmptyValue(object):
"""
a data type replace None
"""
def __init__(self):
pass
def __str__(self):
return '__empty_value__'
def __repr__(self):
return '<{}>'.format(self.__class__.__name__)
EMPTY_VALUE = EmptyValue()
class BaseFieldMetaClass(type):
def __new__(cls, name, bases, attrs):
__all__.append(name)
clazz = super(BaseFieldMetaClass, cls).__new__(cls, name, bases, attrs)
field_name = attrs.get('FIELD_TYPE_NAME')
if field_name is not None and field_name != 'object':
FIELDS_NAME_MAP[field_name] = clazz
return clazz
@six.add_metaclass(BaseFieldMetaClass)
class BaseField(object):
"""
BaseField
"""
"""
INTERNAL_TYPE is the type of the field in python internal, like str, int, list, dict
INTERNAL_TYPE can be a type list, such as [int, long]
INTERNAL_TYPE used to validate field's type by isinstance(value, INTERNAL_TYPE)
"""
INTERNAL_TYPE = object
FIELD_TYPE_NAME = 'object'
PARAMS = [
'strict', 'default', 'validators', 'required'
]
def __init__(self, strict=True, default=EMPTY_VALUE, validators=None, required=True, allow_null=False, **kwargs):
"""
:param strict: bool, if strict is True, value must be an instance of INTERVAL_TYPE,
otherwise, value should be convert to INTERNAL_TYPE
:param default: default value, defaults to EMPTY_VALUE
:param validators: a validator list, validator can be function, other callable object or object that have method named validate
:param required: bool, indicate that this field is whether required
"""
self.strict = strict
self.default = default
self.allow_null = allow_null
if validators is None:
validators = []
elif not isinstance(validators, (tuple, list)):
validators = [validators]
self.validators = validators
self.required = required
def __str__(self):
return self.__class__.__name__
@classmethod
def _check_value_range(cls, min_value, max_value):
if max_value is not None and max_value < min_value:
raise ValueError(_('the max value must greater than or equals the min value, got min value={min}, max value={max}').format(
min=min_value, max=max_value))
def _convert_type(self, value):
if isinstance(self.INTERNAL_TYPE, (tuple, list)):
for t in self.INTERNAL_TYPE:
try:
value = t(value)
break
except TypeError as e:
pass
else:
raise ValueError()
else:
value = self.INTERNAL_TYPE(value)
return value
@classmethod
def _get_all_params(cls):
"""
Collect all PARAMS from this class and its parent class.
"""
params = list(cls.PARAMS)
bases = cls.__bases__
for base in bases:
if issubclass(base, BaseField):
params.extend(base._get_all_params())
return params
def validate(self, value):
"""
return validated value or raise FieldValidationError.
"""
if not self.required:
return value
if not self.allow_null and value is None:
raise exceptions.FieldValidationError(_("value can't be 'null'."))
value = self._validate(value)
for v in self.validators:
v(value)
return value
def _validate(self, value):
"""
return validated value or raise FieldValidationError.
sub-class should override this method.
"""
return self._validate_type(value)
def _validate_type(self, value):
"""
validate the type of value
"""
if not isinstance(value, self.INTERNAL_TYPE):
if self.strict:
raise exceptions.FieldValidationError(
_('got a wrong type: {0}, expect {1}').format(type(value).__name__, self.FIELD_TYPE_NAME))
else:
try:
value = self._convert_type(value)
except (ValueError, TypeError) as e:
raise exceptions.FieldValidationError(
_('type convertion({0} -> {1}) is failed: {2}').format(type(value).__name__, self.FIELD_TYPE_NAME, str(e)))
return value
def is_required(self):
return self.required
def get_default(self):
"""
return default value
"""
if callable(self.default):
return self.default()
else:
return self.default
def to_presentation(self, value):
"""
value: must be a internal value
"""
return value
def to_internal(self, value):
"""
value: must be a validated value
"""
return value
def to_dict(self):
"""
to dict presentation
"""
d = {
'type': self.FIELD_TYPE_NAME,
}
params = self._get_all_params()
for name in params:
if hasattr(self, name):
value = getattr(self, name)
# 处理特殊值
if value is EMPTY_VALUE:
value = '__empty__'
d[name] = value
return d
@classmethod
def from_dict(cls, params):
"""
Create a field from params.
sub-class can override this method.
"""
if params.get('default') == '__empty__':
params['default'] = EMPTY_VALUE
return cls(**params)
def mock_data(self):
"""
reutrn mocking data
sub-class should override this method
"""
return 'this field doesnt implement mock_data method'
class StringField(BaseField):
"""
StringField
internal: six.string_types
presentation: string
"""
if six.PY2:
INTERNAL_TYPE = (unicode, str)
else:
INTERNAL_TYPE = str
FIELD_TYPE_NAME = 'string'
PARAMS = ['min_length', 'max_length', 'regex']
def __init__(self, min_length=0, max_length=None, regex=None, **kwargs):
if min_length < 0:
min_length = 0
self._check_value_range(min_length, max_length)
self.min_length = min_length
self.max_length = max_length
if isinstance(regex, six.string_types):
regex = re.compile(regex)
self.regex = regex
super(StringField, self).__init__(**kwargs)
def _validate(self, value):
value = self._validate_type(value)
if len(value) < self.min_length:
raise exceptions.FieldValidationError(
_('string is too short, min-length is {}').format(self.min_length))
if self.max_length and len(value) > self.max_length:
raise exceptions.FieldValidationError(
_('string is too long, max-length is {}').format(self.max_length))
if not self._match(value):
raise exceptions.FieldValidationError(
_('{0} not match {1}').format(self.regex.pattern, value))
return value
def _match(self, value):
if self.regex is None:
return True
else:
return self.regex.match(value) is not None
def to_internal(self, value):
if value is None:
return value
return six.text_type(value)
def mock_data(self):
min_ = self.min_length
max_ = self.max_length
if max_ is None:
max_ = min_ + 100
size = random.randint(min_, max_)
random_str = ''.join(
[random.choice(string.ascii_letters + string.digits) for _ in range(size)])
random_str = self.to_internal(random_str)
return random_str
class NumberField(BaseField):
if six.PY2:
INTERNAL_TYPE = (int, long, float)
else:
INTERNAL_TYPE = (int, float)
FIELD_TYPE_NAME = 'number'
PARAMS = ['min_value', 'max_value']
def __init__(self, min_value=None, max_value=None, **kwargs):
self._check_value_range(min_value, max_value)
self.min_value = min_value
self.max_value = max_value
super(NumberField, self).__init__(**kwargs)
def _validate(self, value):
value = self._validate_type(value)
if self.min_value is not None and value < self.min_value:
raise exceptions.FieldValidationError(
_('value is too small, min-value is {}').format(self.min_value))
if self.max_value is not None and value > self.max_value:
raise exceptions.FieldValidationError(
_('value is too big, max-value is {}').format(self.max_value))
return value
def mock_data(self):
min_ = self.min_value
if min_ is None:
min_ = 0
max_ = self.max_value
if max_ is None:
max_ = min_ + 1000
return random.uniform(min_, max_)
class IntegerField(NumberField):
INTERNAL_TYPE = int
FIELD_TYPE_NAME = 'integer'
PARAMS = []
def mock_data(self):
d = super(IntegerField, self).mock_data()
return int(d)
class FloatField(NumberField):
INTERNAL_TYPE = float
FIELD_TYPE_NAME = 'float'
PARAMS = []
class BoolField(BaseField):
INTERNAL_TYPE = bool
FIELD_TYPE_NAME = 'bool'
PARAMS = []
def mock_data(self):
return random.choice([True, False])
class UUIDField(BaseField):
INTERNAL_TYPE = uuid.UUID
FIELD_TYPE_NAME = 'UUID'
PARAMS = ['format']
SUPPORT_FORMATS = {
'hex': 'hex',
'str': '__str__',
'int': 'int',
'bytes': 'bytes',
'bytes_le': 'bytes_le'
}
def __init__(self, format='hex', **kwargs):
"""
format: what format used when to_presentation, supports 'hex', 'str', 'int', 'bytes', 'bytes_le'
"""
if format not in self.SUPPORT_FORMATS:
raise ValueError(_('not supports format: {}').format(format))
self.format = format
kwargs.setdefault('strict', False)
super(UUIDField, self).__init__(**kwargs)
def _validate(self, value):
value = self._validate_type(value)
return value
def to_presentation(self, value):
assert isinstance(value, self.INTERNAL_TYPE)
attr = getattr(value, self.SUPPORT_FORMATS[self.format])
if callable(attr):
return attr()
return attr
def mock_data(self):
return uuid.uuid4()
class MD5Field(StringField):
FIELD_TYPE_NAME = 'md5'
PARAMS = []
REGEX = r'[\da-fA-F]{32}'
def __init__(self, **kwargs):
kwargs['strict'] = True
super(MD5Field, self).__init__(min_length=32,
max_length=32,
regex=self.REGEX,
**kwargs)
def _validate(self, value):
try:
return super(MD5Field, self)._validate(value)
except exceptions.FieldValidationError as e:
raise exceptions.FieldValidationError(
_('Got wrong md5 value: {}').format(value))
def mock_data(self):
return ''.join([random.choice(string.hexdigits) for i in range(32)])
class SHAField(StringField):
FIELD_TYPE_NAME = 'sha'
SUPPORT_VERSION = [1, 224, 256, 384, 512]
PARAMS = ['version']
def __init__(self, version=256, **kwargs):
if version not in self.SUPPORT_VERSION:
raise ValueError(_('{0} not support, support versions are: {1}').format(
version, self.SUPPORT_VERSION))
if version == 1:
length = 40
else:
length = int(version / 8 * 2)
self.version = version
self.length = length
kwargs['strict'] = True
super(SHAField, self).__init__(min_length=length,
max_length=length,
regex=r'[\da-fA-F]{' +
str(length) + '}',
**kwargs)
def _validate(self, value):
try:
return super(SHAField, self)._validate(value)
except exceptions.FieldValidationError as e:
raise exceptions.FieldValidationError(
_('Got wrong sha{0} value: {1}').format(self.version, value))
def mock_data(self):
return ''.join([random.choice(string.hexdigits) for i in range(self.length)])
class EmailField(StringField):
FIELD_TYPE_NAME = 'email'
REGEX = r'^[a-zA-Z0-9.!#$%&\'*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
PARAMS = []
def __init__(self, **kwargs):
kwargs['strict'] = True
super(EmailField, self).__init__(regex=self.REGEX, **kwargs)
def _validate(self, value):
try:
return super(EmailField, self)._validate(value)
except exceptions.FieldValidationError as e:
raise exceptions.FieldValidationError(
_('Got wrong email value: {}').format(value))
def mock_data(self):
name = ''.join(random.sample(string.ascii_lowercase, 5))
domain = '{0}.com'.format(
''.join(random.sample(string.ascii_lowercase, 3)))
return '{0}@{1}'.format(name, domain)
class IPAddressField(BaseField):
INTERNAL_TYPE = IP
FIELD_TYPE_NAME = 'ip_address'
PARAMS = ['version']
SUPPORT_VERSIONS = ['ipv4', 'ipv6', 'both']
def __init__(self, version='both', **kwargs):
if version not in self.SUPPORT_VERSIONS:
raise ValueError(_('{} version is not supported').format(version))
self.version = version
kwargs.setdefault('strict', False)
super(IPAddressField, self).__init__(**kwargs)
def _validate(self, value):
try:
value = IP(value)
except ValueError as e:
raise exceptions.FieldValidationError(str(e))
if self.version == 'ipv4' and value.version() != 4:
raise exceptions.FieldValidationError(
_('expected an ipv4 address, got {}').format(value.strNormal()))
if self.version == 'ipv6' and value.version() != 6:
raise exceptions.FieldValidationError(
-('expected an ipv6 address, got {}').format(value.strNormal()))
return value
def to_presentation(self, value):
return value.strNormal()
def mock_data(self):
v = self.version
if v == 'both':
v = random.choice(['ipv4', 'ipv6'])
if v == 'ipv4':
ip = random.randint(0, MAX_IPV4_ADDRESS)
return IP(ip)
else:
ip = random.randint(0, MAX_IPV6_ADDRESS)
return IP(ip)
class URLField(StringField):
FIELD_TYPE_NAME = 'url'
PARAMS = []
SCHEMAS = ('http', 'https')
def __init__(self, **kwargs):
kwargs['strict'] = True
super(URLField, self).__init__(min_length=0, **kwargs)
def _validate(self, value):
value = self._validate_type(value)
url = urlparse.urlparse(value)
if url.scheme not in self.SCHEMAS:
raise exceptions.FieldValidationError(_('schema is lost'))
if url.hostname == '':
raise exceptions.FieldValidationError(_('hostname is lost'))
return url.geturl()
def mock_data(self):
return 'http://www.example.com/media/image/demo.jpg'
class EnumField(BaseField):
INTERNAL_TYPE = object
FIELD_TYPE_NAME = 'enum'
PARAMS = ['choices']
def __init__(self, choices=None, **kwargs):
if choices is None or len(choices) == 0:
raise ValueError('choices cant be empty or None')
self.choices = choices
super(EnumField, self).__init__(**kwargs)
def _validate(self, value):
if value not in self.choices:
raise exceptions.FieldValidationError(
_('{!r} not in the choices').format(value))
return value
def mock_data(self):
return random.choice(self.choices)
class DictField(BaseField):
INTERNAL_TYPE = dict
FIELD_TYPE_NAME = 'dict'
PARAMS = ['validator']
def __init__(self, validator=None, **kwargs):
"""
:param validator: Validator object
"""
self.validator = validator
super(DictField, self).__init__(**kwargs)
def _validate(self, value):
value = self._validate_type(value)
if self.validator:
v = self.validator(value)
if v.is_valid():
value = v.validated_data
else:
raise exceptions.FieldValidationError(v.errors)
else:
value = copy.deepcopy(value)
return value
def to_dict(self):
d = super(DictField, self).to_dict()
if d['validator'] is not None:
d['validator'] = d['validator'].to_dict()
return d
def mock_data(self):
if self.validator:
return self.validator.mock_data()
else:
return {}
class ListField(BaseField):
INTERNAL_TYPE = (list, tuple)
FIELD_TYPE_NAME = 'list'
PARAMS = ['field', 'min_length', 'max_length']
def __init__(self, field=None, min_length=0, max_length=None, **kwargs):
if field is not None and not isinstance(field, BaseField):
raise ValueError(
_('field param expect a instance of BaseField, but got {!r}').format(field))
self.field = field
self._check_value_range(min_length, max_length)
self.min_length = min_length
self.max_length = max_length
super(ListField, self).__init__(**kwargs)
def _validate(self, value):
value = self._validate_type(value)
if self.min_length is not None and len(value) < self.min_length:
raise exceptions.FieldValidationError(
_('this list has too few elements, min length is {}').format(self.min_length))
if self.max_length is not None and len(value) > self.max_length:
raise exceptions.FieldValidationError(
_('this list has too many elements, max length is {}').format(self.max_length))
if self.field:
new_value = []
for item in value:
new_item = self.field.validate(item)
new_value.append(new_item)
value = new_value
else:
value = copy.deepcopy(value)
return value
def to_dict(self):
d = super(ListField, self).to_dict()
if d['field'] is not None:
d['field'] = d['field'].to_dict()
return d
@classmethod
def from_dict(cls, params):
if 'field' in params and isinstance(params['field'], dict):
params['field'] = create_field(params['field'])
return super(ListField, cls).from_dict(params)
def mock_data(self):
min_ = self.min_length
if min_ is None:
min_ = 0
max_ = self.max_length
if max_ is None:
max_ = 10
length = random.choice(range(min_, max_))
data = [None] * length
if self.field:
for i in range(length):
data[i] = self.field.mock_data()
return data
class TimestampField(IntegerField):
FIELD_TYPE_NAME = 'timestamp'
PARAMS = []
def __init__(self, **kwargs):
super(TimestampField, self).__init__(
min_value=0, max_value=2 ** 32 - 1, **kwargs)
def _validate(self, value):
try:
return super(TimestampField, self)._validate(value)
except exceptions.FieldValidationError as e:
raise exceptions.FieldValidationError(
_('Got wrong timestamp: {}').format(value))
class DatetimeField(BaseField):
INTERNAL_TYPE = datetime.datetime
FIELD_TYPE_NAME = 'datetime'
PARAMS = ['dt_format', 'tzinfo']
DEFAULT_FORMAT = '%Y/%m/%d %H:%M:%S'
def __init__(self, dt_format=None, tzinfo=None, **kwargs):
if dt_format is None:
dt_format = self.DEFAULT_FORMAT
self.dt_format = dt_format
if isinstance(tzinfo, six.string_types):
try:
import pytz
except ImportError as e:
raise ValueError(
_('Cant create DatetimeField instance with tzinfo {}, please install pytz and try again').format(params['tzinfo']))
tzinfo = pytz.timezone(tzinfo)
self.tzinfo = tzinfo
kwargs.setdefault('strict', False)
super(DatetimeField, self).__init__(**kwargs)
def _convert_type(self, value):
# override
if isinstance(value, six.string_types):
if value.isdigit():
value = int(value)
return self.INTERNAL_TYPE.fromtimestamp(value, tz=self.tzinfo)
else:
dt = self.INTERNAL_TYPE.strptime(value, self.dt_format)
if self.tzinfo:
dt = dt.replace(tzinfo=self.tzinfo)
return dt
elif isinstance(value, six.integer_types):
return self.INTERNAL_TYPE.fromtimestamp(value, tz=self.tzinfo)
else:
raise ValueError(_('Got wrong datetime value: {}').format(value))
def _validate(self, value):
value = self._validate_type(value)
return copy.copy(value)
def to_presentation(self, value):
return value.strftime(self.dt_format)
def to_dict(self):
d = super(DatetimeField, self).to_dict()
if d['tzinfo'] is not None:
d['tzinfo'] = force_text(d['tzinfo'])
return d
def mock_data(self):
return self.INTERNAL_TYPE.fromtimestamp(random.randint(0, 2 ** 32 - 1))
class DateField(BaseField):
INTERNAL_TYPE = datetime.date
FIELD_TYPE_NAME = 'date'
PARAMS = ['dt_format']
DEFAULT_FORMAT = '%Y/%m/%d'
def __init__(self, dt_format=None, **kwargs):
if dt_format is None:
dt_format = self.DEFAULT_FORMAT
self.dt_format = dt_format
kwargs.setdefault('strict', False)
super(DateField, self).__init__(**kwargs)
def _convert_type(self, value):
# override
if isinstance(value, six.string_types):
if value.isdigit():
value = int(value)
return self.INTERNAL_TYPE.fromtimestamp(value)
else:
dt = datetime.datetime.strptime(value, self.dt_format)
return dt.date()
elif isinstance(value, six.integer_types):
return self.INTERNAL_TYPE.fromtimestamp(value)
else:
raise ValueError()
def _validate(self, value):
value = self._validate_type(value)
return copy.copy(value)
def to_presentation(self, value):
return value.strftime(self.dt_format)
def mock_data(self):
return self.INTERNAL_TYPE.fromtimestamp(random.randint(0, 2 ** 32 - 1))

View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
import os
BASE_PATH = os.path.abspath(os.path.dirname(__file__))
ENV_LOCALE_DIR = 'PYTHON_VALIDATOR_LOCALE'
ENV_LANGUAGES = 'PYTHON_VALIDATOR_LANGUAGES'
DEFAULT_LOCALE_DIR = os.path.join(BASE_PATH, 'locale')
DOMAIN = 'python-validator'
domain = DOMAIN
localedir = os.environ.get(ENV_LOCALE_DIR, DEFAULT_LOCALE_DIR)
languages = os.environ.get(ENV_LANGUAGES)
if languages is not None:
try:
languages = languages.split(',')
except Exception:
languages = None
import gettext as _gettext
translation = _gettext.translation(domain, localedir, languages=languages, fallback=True)
def get_localedir():
return localedir
def gettext(s):
return translation.gettext(s)
def ngettext(singular, plural, n):
return translation.ngettext(singular, plural, n)
def lgettext(s):
return translation.lgettext(s)
def lngettext(singular, plural, n):
return translation.lngettext(singular, plural, n)

46
spider/validator/utils.py Normal file
View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import six
def force_text(s, encoding='utf8', errors='strict'):
if isinstance(s, six.text_type):
return s
if not issubclass(type(s), six.string_types):
if six.PY3:
if isinstance(s, bytes):
s = six.text_type(s, encoding, errors)
else:
s = six.text_type(s)
elif hasattr(s, '__unicode__'):
s = six.text_type(s)
else:
s = six.text_type(bytes(s), encoding, errors)
else:
# Note: We use .decode() here, instead of six.text_type(s, encoding,
# errors), so that if s is a SafeBytes, it ends up being a
# SafeText at the end.
s = s.decode(encoding, errors)
return s
def force_bytes(s, encoding='utf-8', errors='strict'):
if isinstance(s, six.binary_type):
if encoding == 'utf-8':
return s
else:
return s.decode('utf-8', errors).encode(encoding, errors)
if not isinstance(s, six.string_types):
try:
if six.PY3:
return six.text_type(s).encode(encoding)
else:
return bytes(s)
except UnicodeEncodeError:
return six.text_type(s).encode(encoding, errors)
else:
return s.encode(encoding, errors)
if six.PY3:
force_str = force_text
else:
force_str = force_bytes
force_unicode = force_text

View File

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import six
from . import exceptions
from .fields import BaseField, EMPTY_VALUE, create_field, DictField
from .utils import force_str
class ValidatorMetaClass(type):
def __new__(cls, cls_name, bases, attrs):
fields_map = dict()
parent_fields_map = dict()
for parent in bases:
if hasattr(parent, '_FIELDS_MAP'):
parent_fields_map.update(parent._FIELDS_MAP)
for name, value in six.iteritems(attrs):
if isinstance(value, BaseField):
fields_map[name] = value
for name in fields_map:
attrs.pop(name, None)
parent_fields_map.update(fields_map)
attrs['_FIELDS_MAP'] = parent_fields_map
return super(ValidatorMetaClass, cls).__new__(cls, cls_name, bases, attrs)
@six.add_metaclass(ValidatorMetaClass)
class Validator(object):
""" a data validator like Django ORM
"""
def __init__(self, raw_data):
"""
:param raw_data: unvalidate data
"""
assert isinstance(raw_data, dict), '"raw_data" must be a dict, not "{}"'.format(type(raw_data).__name__)
self.raw_data = raw_data
self.validated_data = None
self.errors = {}
def _validate(self):
data = {}
for name, field in six.iteritems(self._FIELDS_MAP):
value = self.raw_data.get(name, field.get_default())
if value is EMPTY_VALUE:
if field.is_required():
self.errors[name] = exceptions.FieldRequiredError()
continue
"""
# dont need to validate None
if value is None:
data[name] = None
continue"""
try:
validated_value = field.validate(value)
internal_value = field.to_internal(validated_value)
field_validator = getattr(
self, 'validate_{}'.format(name), None)
if field_validator and callable(field_validator):
field_validator(internal_value)
data[name] = internal_value
except exceptions.FieldValidationError as e:
self.errors[name] = e
if self.errors:
return
try:
data = self.validate(data)
except exceptions.ValidationError as e:
self.errors['__data_error__'] = e
if not self.errors:
self.validated_data = data
def is_valid(self, raise_error=False):
self._validate()
if raise_error and self.errors:
raise exceptions.ValidationError(self.errors)
return False if self.errors else True
def validate(self, data):
"""
model-level validate.
sub-class can override this method to validate data, return modified data
"""
return data
@property
def str_errors(self):
errors = dict()
for name, error in six.iteritems(self.errors):
errors[name] = error.get_detail()
return errors
@classmethod
def to_dict(cls):
"""
format Validator to dict
"""
d = dict()
for name, field in six.iteritems(cls._FIELDS_MAP):
field_info = field.to_dict()
d[name] = field_info
return d
@classmethod
def mock_data(cls):
"""
return random mocking data.
mocking data will be valid in most case, but it maybe can't pass from your own `validate` method or `validator`
"""
mocking_data = {}
for name, field in six.iteritems(cls._FIELDS_MAP):
mocking_data[name] = field.mock_data()
return mocking_data
def _format(self):
fields = []
for name, field in six.iteritems(self._FIELDS_MAP):
fields.append('{0}:{1}'.format(name, field.FIELD_TYPE_NAME))
fields = ','.join(fields)
if len(fields) > 103:
fields = fields[:100]
return '<{0}: {1}>'.format(self.__class__.__name__, fields)
def __str__(self):
return self._format()
def __repr__(self):
return self._format()
def create_validator(data_struct_dict, name=None):
"""
create a Validator instance from data_struct_dict
:param data_struct_dict: a dict describe validator's fields, like the dict `to_dict()` method returned.
:param name: name of Validator class
:return: Validator instance
"""
if name is None:
name = 'FromDictValidator'
attrs = {}
for field_name, field_info in six.iteritems(data_struct_dict):
field_type = field_info['type']
if field_type == DictField.FIELD_TYPE_NAME and isinstance(field_info.get('validator'), dict):
field_info['validator'] = create_validator(field_info['validator'])
attrs[field_name] = create_field(field_info)
name = force_str(name)
return type(name, (Validator, ), attrs)