RPC value typecasting method
This commit is contained in:
parent
eb5e861441
commit
24291804a2
23
rpc.py
23
rpc.py
@ -17,7 +17,16 @@ class RPCExcAuth(RPCExc): pass
|
|||||||
class RPCExcServerError(RPCExc): pass
|
class RPCExcServerError(RPCExc): pass
|
||||||
class RPCExcInvalidResult(RPCExc): pass
|
class RPCExcInvalidResult(RPCExc): pass
|
||||||
|
|
||||||
class MReference(str): pass
|
class RPCType(object):
|
||||||
|
LITERAL = b'0'
|
||||||
|
REFERENCE = b'1'
|
||||||
|
LIST = b'2'
|
||||||
|
GLOBAL = b'3'
|
||||||
|
EMPTY = b'4'
|
||||||
|
STREAM = b'5'
|
||||||
|
def __init__(self, value, magic=None):
|
||||||
|
self.magic = magic
|
||||||
|
self.value = value.value if isinstance(value, RPCType) else value
|
||||||
|
|
||||||
RecordServerInfo = namedtuple('RecordServerInfo', ('server', 'volume', 'uci', 'device', 'attempts', 'skip_signon_screen', 'domain', 'production'))
|
RecordServerInfo = namedtuple('RecordServerInfo', ('server', 'volume', 'uci', 'device', 'attempts', 'skip_signon_screen', 'domain', 'production'))
|
||||||
|
|
||||||
@ -27,20 +36,20 @@ def s_pack(value: Any, encoding: str='latin-1'):
|
|||||||
return bytes((len(encoded),)) + encoded
|
return bytes((len(encoded),)) + encoded
|
||||||
raise ValueError('cannot s-pack string longer than 255 bytes: ' + repr(value))
|
raise ValueError('cannot s-pack string longer than 255 bytes: ' + repr(value))
|
||||||
|
|
||||||
def l_pack(value: Any, envelope: int=3, wrapped: bool=True, basictype=b'0', encoding: str='latin-1'):
|
def l_pack(value: Any, envelope: int=3, wrapped: bool=True, magic=None, encoding: str='latin-1'):
|
||||||
if isinstance(value, dict):
|
if isinstance(value, dict):
|
||||||
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in value.items())
|
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in value.items())
|
||||||
return (b'2' + bare + b'f') if wrapped else bare
|
return ((magic or b'2') + bare + b'f') if wrapped else bare
|
||||||
elif not isinstance(value, str) and hasattr(value, '__iter__'):
|
elif not isinstance(value, str) and hasattr(value, '__iter__'):
|
||||||
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in enumerate(value, start=1))
|
bare = b't'.join(l_pack(k, envelope=envelope, wrapped=False, encoding=encoding) + l_pack(v, envelope=envelope, wrapped=False, encoding=encoding) for k, v in enumerate(value, start=1))
|
||||||
return (b'2' + bare + b'f') if wrapped else bare
|
return ((magic or b'2') + bare + b'f') if wrapped else bare
|
||||||
elif isinstance(value, MReference):
|
elif isinstance(value, RPCType):
|
||||||
return l_pack(str(value), envelope=envelope, basictype=b'1', encoding=encoding)
|
return l_pack(value.value, envelope=envelope, magic=value.magic, encoding=encoding)
|
||||||
else:
|
else:
|
||||||
encoded = str(value).encode(encoding)
|
encoded = str(value).encode(encoding)
|
||||||
if len(encoded) <= 10**envelope - 1:
|
if len(encoded) <= 10**envelope - 1:
|
||||||
bare = str(len(encoded)).zfill(envelope).encode(encoding) + encoded
|
bare = str(len(encoded)).zfill(envelope).encode(encoding) + encoded
|
||||||
return (basictype + bare + b'f') if wrapped else bare
|
return ((magic or b'0') + bare + b'f') if wrapped else bare
|
||||||
raise ValueError(f'cannot l-pack string longer than {10**envelope - 1} bytes with an envelope of {envelope}: ' + repr(value))
|
raise ValueError(f'cannot l-pack string longer than {10**envelope - 1} bytes with an envelope of {envelope}: ' + repr(value))
|
||||||
|
|
||||||
def l_pack_maxlen(value: Any, encoding: str='latin-1'):
|
def l_pack_maxlen(value: Any, encoding: str='latin-1'):
|
||||||
|
Loading…
Reference in New Issue
Block a user