ruse-py/ruse.py

322 lines
16 KiB
Python
Raw Normal View History

2019-11-01 08:15:30 -04:00
#!/usr/bin/env python3
import re
import json
import weakref
import random
from itertools import groupby
from collections import namedtuple
from collections import ChainMap
from types import GeneratorType
global_env = ChainMap()
def throw(ex, token=None, sourcefile=None, lineno=None, column=None):
raise type(ex)(f"{f'{sourcefile}:'.encode('unicode-escape').decode('utf-8') if (sourcefile := getattr(token, 'sourcefile', sourcefile)) else ''}{f'{lineno}:' if (lineno := getattr(token, 'lineno', lineno)) else ''}{f'{column}:' if (column := getattr(token, 'column', column)) else ''} {ex.args[0]}") if token or sourcefile or lineno or column else ex
Token = namedtuple('Token', ('str', 'type', 'sourcefile', 'lineno', 'column'))
Token.default = Token('', '', '<script>', None, None)
def tokenize(data, sourcefile=None, lineno=1, re_token=re.compile(r'((?P<comment>#|#[^\S\n].*?|;.*?)$|(?P<abbr>(?:#|\'|`|,@|,)(?=[\)\]\}\S]))|(?P<atom>"(?:\\.|[^"\\])*"|[^\s\(\)\[\]\{\}"\'`#;]+)|(?P<left>[\(\[\{])|(?P<right>[\)\]\}]))\s*', flags=re.MULTILINE), re_space=re.compile(r'\s+'), _token=None):
sourcefile = sourcefile or getattr(data, 'name', None) or '<script>'
for lineno, line in enumerate(data.splitlines() if isinstance(data, str) else data, start=lineno):
pos = len(m.group()) if (m := re_space.match(line)) else 0
while (m := re_token.match(line, pos)):
yield Token(str=(_token := m.group(1)), type=tuple(k for k, v in m.groupdict().items() if v is not None)[0], sourcefile=sourcefile, lineno=lineno, column=pos + 1)
pos = m.end()
else:
if len(line := re.sub(r'\s+', ' ', line[pos:].strip())) > 0:
throw(SyntaxError(f'invalid content after token {dump(_token)}: {line[:50]}' if _token else f'invalid content: {line[:50]}'), sourcefile=sourcefile, lineno=lineno, column=pos + 1)
parenmap = {'(': ')', '[': ']', '{': '}'}
def parse(tokens, cur=None, right=None):
for cur in tokens:
if cur.type == 'right':
if cur.str == right: return
throw(SyntaxError(f'expected closing parenthesis "{right}" instead of "{cur.str}"' if right else f'unexpected closing parenthesis "{cur.str}"'), token=cur)
elif cur.type == 'left': yield SExp(parse(tokens, cur=cur, right=parenmap[cur.str]), token=cur)
elif cur.type == 'atom': yield atom(cur.str)
elif cur.type == 'abbr': yield apply_abbr(cur.str, next(parse(tokens, cur=cur)))
right is None or throw(SyntaxError(f'expected closing parenthesis "{right}" after expression'), token=cur)
def atom(token):
try:
return json.loads(token)
except ValueError:
return Symbol(token)
class Symbol(str):
_registry = weakref.WeakValueDictionary()
def __new__(cls, symbol):
if symbol not in cls._registry:
ref = cls._registry[symbol] = super(Symbol, cls).__new__(cls, symbol) # assignment holds weakref
return cls._registry[symbol]
def __call__(self, envx=global_env):
return self if self.startswith(':') else envx[self] # :keyword symbols always evaluate to themselves
def __repr__(self):
return str(self)
def apply_abbr(abbr, datum, sym_quote=Symbol('quote'), sym_quasiquote=Symbol('quasiquote'), sym_unquote=Symbol('unquote'), sym_unquote_splicing=Symbol('unquote-splicing')):
if abbr == '#' and isinstance(datum, Symbol):
return datum if datum.startswith(':') else {'t': True, 'f': False, 'n': None}[datum]
elif abbr == '\'':
return SExp((sym_quote, datum), token=getattr(datum, 'token', Token.default))
elif abbr == '`':
return SExp((sym_quasiquote, datum), token=getattr(datum, 'token', Token.default))
elif abbr == ',':
return SExp((sym_unquote, datum), token=getattr(datum, 'token', Token.default))
elif abbr == ',@':
return SExp((sym_unquote_splicing, datum), token=getattr(datum, 'token', Token.default))
else:
raise NotImplementedError(f'{abbr}{datum}')
class SExp(tuple):
ops = {}
def __new__(cls, data, token=Token.default):
if isinstance(data, SExp) or not isinstance(data, (tuple, GeneratorType)):
return data
self = super(SExp, cls).__new__(cls, data)
self.token = token
self.left = token.str
self.right = parenmap.get(token.str)
return self
def __getitem__(self, idx):
return SExp((data := tuple.__getitem__(self, idx)), token=getattr(data[0], 'token', self.token) if len(data) else self.token) if isinstance(idx, slice) else tuple.__getitem__(self, idx)
def __repr__(self):
return f"{self.left or ''}{' '.join(dump(arg) for arg in self)}{self.right or ''}"
@classmethod
def implement(cls, *names):
return lambda func: (func, [cls.ops.__setitem__(Symbol(name) if isinstance(name, str) else name, func) for name in names])[0]
@classmethod
def compile(cls, expr, envp=global_env):
if not isinstance(expr, tuple): # constant => constant
return expr
if len(expr) < 1:
return None
if (op := expr[0]) in cls.ops:
res = cls.ops[op](expr, envp=envp) # allowmacro=allowmacro
elif isinstance(op, Symbol) and (op in envp) and isinstance(envp[op], Macro):
res = cls.compile(envp[op](*expr[1:]), envp=envp) # allowmacro=allowmacro
else:
res = cls.ops[None](expr, envp=envp) # allowmacro=allowmacro
if callable(res):
res.expr = expr
return res
@SExp.implement('quote') # (quote expr) => expr
def compile_quote(expr, envp=global_env): # (quote expr) => (quote expr)
len(expr) == 2 or throw(SyntaxError(f'expected 1 argument in {expr}'), token=expr.token)
return lambda envx=global_env: expr[1]
@SExp.implement('if') # (if test clause-true clause-false) => clause-true | clause-false
def compile_if(expr, envp=global_env):
len(expr) in {3, 4} or throw(SyntaxError(f'expected 2 or 3 arguments in {expr}'), token=expr.token)
xargs = tuple(SExp.compile(arg, envp=envp.new_child()) for arg in expr[1:])
return lambda envx=global_env: tailcall(runx=xargs[1] if evaluate(xargs[0], envx=envx) else xargs[2] if len(xargs) > 2 else None, envx=envx)
@SExp.implement('cond') # (cond (predicate value)*) => value
def compile_cond(expr, envp=global_env, sym_else=Symbol('else')):
all(isinstance(arg, SExp) and len(arg) == 2 for arg in expr[1:]) or throw(SyntaxError(f'expected (predicate value) pairs in {expr}'), token=expr.token)
xargs = tuple((SExp.compile(predicate, envp=envp.new_child()), SExp.compile(value, envp=envp.new_child())) for predicate, value in expr[1:])
def __call__(envx=global_env):
for predicate, value in xargs:
if predicate is sym_else or evaluate(predicate, envx):
return tailcall(runx=value, envx=envx)
return __call__
@SExp.implement('set!')
def compile_set(expr, envp=global_env):
len(expr) == 3 or throw(SyntaxError(f'expected 2 arguments in {expr}'), token=expr.token)
op, name, body = expr
isinstance(name, Symbol) or throw(SyntaxError(f'{expr} <-- could only set! a symbol'), token=expr.token)
xbody = SExp.compile(body, envp=envp.new_child())
return lambda envx=global_env: env_find(envx, name).__setitem__(name, evaluate(xbody, envx=envx)) # (set! name expr)
@SExp.implement('define', 'define-macro')
def compile_define(expr, envp=global_env, sym_define_macro=Symbol('define-macro'), sym_lambda=Symbol('lambda')):
len(expr) >= 3 or throw(SyntaxError(f'expected at least 2 arguments in {expr}'), token=expr.token)
op, name, *body = expr
if name and isinstance(name, SExp): # (define (name args*) body+) => (define name (lambda (args*) body+))
len(name) >= 1 or throw(SyntaxError(f'expected (define (name args*) body+) instead of {expr}'), token=expr.token)
return SExp.compile(SExp((op, name[0], SExp((sym_lambda, name[1:], *body), token=expr.token)), token=expr.token), envp=envp)
else:
len(expr) == 3 or throw(SyntaxError(f'expected 2 arguments in {expr}'), token=expr.token)
isinstance(name, Symbol) or throw(SyntaxError(f'could only define a symbol; got {expr} instead'), token=expr.token)
xvalue = SExp.compile(expr[2], envp=envp.new_child())
if op is sym_define_macro:
#allowmacro or throw(SyntaxError(f'{expr} <-- define-macro only allowed at top level'), token=expr.token)
value = xvalue(envx=envp.new_child()) # original.new_child()
isinstance(value, Closure) or throw(SyntaxError(f'macro must be a procedure; got {expr} instead'), token=expr.token)
value.__class__ = Macro
envp[name] = value # (define-macro name value)
return None
else:
return lambda envx=global_env: envx.__setitem__(name, evaluate(xvalue, envx=envx)) # (define name expr)
@SExp.implement('begin') # (begin exprs*)
def compile_begin(expr, envp=global_env):
xargs = tuple(SExp.compile(arg, envp=envp) for arg in expr[1:]) # allowmacro=allowmacro
def __call__(envx=global_env):
for arg in xargs[:-1]:
evaluate(arg, envx=envx)
return tailcall(runx=xargs[-1], envx=envx)
return __call__ if len(xargs) > 0 else None
@SExp.implement('lambda') # (lambda (args*) expr)
def compile_lambda(expr, envp=global_env, sym_begin=Symbol('begin')): # (lambda (args*) exprs+) => (lambda (args*) (begin exprs+))
len(expr) >= 3 or throw(SyntaxError(f'expected at least 2 arguments in {expr}'), token=expr.token)
op, names, *body = expr
isinstance(names, Symbol) or (isinstance(names, tuple) and all(isinstance(name, Symbol) for name in names)) or throw(SyntaxError(f'invalid argument list in {expr}'), token=expr.token)
xbody = SExp.compile(body[0] if len(body) == 1 else SExp((sym_begin, *body), token=expr.token), envp=envp.new_child())
return lambda envx=global_env: Closure(names, xbody, envx=envx)
@SExp.implement('let')
def compile_let(expr, envp=global_env, sym_lambda=Symbol('lambda')): # (let ((name value)*) exprs+) => ((lambda (names*) exprs+) values*)
len(expr) >= 3 or throw(SyntaxError(f'expected at least 2 arguments in {expr}'), token=expr.token)
op, bindings, *body = expr
all(isinstance(b, tuple) and len(b)==2 and isinstance(b[0], Symbol) for b in bindings) or throw(SyntaxError(f'invalid binding list in {expr}'), token=expr.token)
names, values = zip(*bindings) if len(bindings) > 0 else ((), ())
return SExp.compile(SExp((SExp((sym_lambda, names, *body), token=expr.token), *values), token=expr.token), envp=envp)
@SExp.implement('quasiquote')
def compile_quasiquote(expr, envp=global_env): # Expand `x => 'x; `,x => x; `(,@x y) => (append x y)
len(expr) == 2 or throw(SyntaxError(f'expected 1 argument in {expr}'), token=expr.token)
return SExp.compile(compile_quasiquote_expand(expr[1]), envp=envp)
def compile_quasiquote_expand(expr, sym_unquote=Symbol('unquote'), sym_unquote_splicing=Symbol('unquote-splicing'), sym_append=Symbol('append'), sym_cons=Symbol('cons'), sym_quote=Symbol('quote')):
if isinstance(expr, SExp) and len(expr) > 0:
(expr0 := expr[0]) is not sym_unquote_splicing or throw(SyntaxError(f'invalid comma splice {expr}'), token=expr.token)
if expr0 is sym_unquote:
len(expr) == 2 or throw(SyntaxError(f'expected 1 argument in {expr}'), token=expr.token)
return expr[1]
elif isinstance(expr0, SExp) and len(expr0) > 0 and expr0[0] is sym_unquote_splicing:
len(expr0) == 2 or throw(SyntaxError(f'expected 1 argument in {expr0}'), token=expr.token)
return SExp((sym_append, expr0[1], compile_quasiquote_expand(expr[1:])), token=getattr(expr0, 'token', expr.token))
else:
return SExp((sym_cons, compile_quasiquote_expand(expr0), compile_quasiquote_expand(expr[1:])), token=getattr(expr0, 'token', expr.token))
else:
return SExp((sym_quote, expr), token=getattr(expr, 'token', Token.default))
@SExp.implement(None)
def compile_call(expr, envp=global_env):
xargs = tuple(SExp.compile(arg, envp=envp.new_child()) for arg in expr)
def __call__(envx=global_env):
values = tuple(evaluate(arg, envx=envx) for arg in xargs)
return tailcall(runx=func.body, envx=func.child_env(values[1:])) if isinstance((func := values[0]), Closure) else SExp(func(*values[1:]), token=expr.token)
return __call__
class Closure(object):
def __init__(self, params, body, envx):
self.params, self.body, self.envx = params, body, envx
def __call__(self, *args):
return evaluate(self.body, envx=self.child_env(args))
def child_env(self, args):
return self.envx.new_child(dict(((self.params, args),) if isinstance(self.params, Symbol) else zip(self.params, args))) # original.new_child()
class Macro(Closure): pass
tailcall = namedtuple('tailcall', ('runx', 'envx'))
def func_gensym(base='g', start=1000000, stop=9999999):
while base + (r := str(random.randrange(start, stop))) in Symbol._registry: pass
return Symbol(base + r)
def func_call_with_current_continuation(proc):
ex = RuntimeWarning('Cannot continue this continuation anymore.')
def escape(value):
ex.value = value
raise ex
try: return proc(escape)
except RuntimeWarning as w:
if w is ex: return ex.value
else: raise w
def environment(env=global_env):
import math
env.update(vars(math))
env.update({
'assert=': lambda a, b: a == b or throw(AssertionError(f'{a} != {b}')),
'display': lambda *args: print(*args), 'log': lambda a: (a, print(a))[0],
'+': lambda *args: sum(args), '-': lambda a, *b: a - sum(b),
'*': lambda *args: math.prod(args), '/': lambda a, *b: a/math.prod(b),
'=': lambda a, b: a == b, '!=': lambda a, b: a != b,
'<': lambda a, b: a < b, '>': lambda a, b: a > b,
'<=': lambda a, b: a <= b, '>=': lambda a, b: a >= b,
'not': lambda a: not a,
'length': lambda a: len(a),
'cons': lambda a, b: (a,) + tuple(b), 'append': lambda *args: (i for obj in args for i in obj),
'list': lambda *args: args, 'list?': lambda a: isinstance(a, tuple), 'list-has': lambda obj, v: v in obj,
'list-ref': lambda obj, i: obj[i], 'list-tail': lambda obj, i: obj[i:],
'dict': lambda *args: dict(args), 'dict-has': lambda obj, k: k in obj, 'dict-get': lambda obj, k, d=None: obj.get(k, d), 'dict-add': lambda obj, k, v: (obj := obj.copy(), obj.__setitem__(k, v))[0],
'atom?': lambda a: not isinstance(a, tuple),
'null?': lambda a: a == (),
'call/cc': func_call_with_current_continuation,
'gensym': func_gensym
})
return env
global_env = environment(global_env)
def env_find(env, key):
for m in env.maps:
if key in m:
return m
raise KeyError(key)
class EvalException(RuntimeError): pass
def evaluate(runx, envx=global_env):
while callable(runx): # Symbol or Form
try:
res = runx(envx=envx)
except (RuntimeWarning, EvalException):
raise
except Exception as ex:
raise EvalException(f'{runx.expr.token.sourcefile.encode("unicode-escape").decode("utf-8")}:{runx.expr.token.lineno}:{runx.expr.token.column}: {ex.__class__.__name__}: {ex.args[0]}') if hasattr(runx, 'expr') else ex from ex
if isinstance(res, tailcall):
runx, envx = res # unwrap
else:
return res
else: # constant
return runx
def execute(script, envx=global_env, sourcefile=None, command=False, _result=None):
if command:
return evaluate(SExp.compile(SExp(parse(tokenize(script, sourcefile=sourcefile or '<command>'))), envp=envx), envx=envx)
for expr in parse(tokenize(script, sourcefile=sourcefile)):
_result = evaluate(SExp.compile(expr, envp=envx))
return _result
def load(script, sourcefile=None):
return SExp(parse(tokenize(script, sourcefile=sourcefile)))
def dump(expr):
return repr(expr) if isinstance(expr, (SExp, Symbol)) else (dump(expr.expr) if hasattr(expr, 'expr') else repr(expr)) if callable(expr) else json.dumps(expr)
__all__ = [tokenize, parse, evaluate, execute, load, dump, SExp]
if __name__ == '__main__':
import sys
try:
import readline
except ModuleNotFoundError:
pass
if (sys_stdin_isatty := sys.stdin.isatty()):
import platform
print('Ruse 0.0.1 (a little Scheme)', (x := 'Python ' + ' '.join(sys.version.split())), '='*len(x), sep='\n')
for fname in ['stdlib.scm'] + sys.argv[1:]:
with open(fname) as f: execute(f)
def readlines(prompt=''):
while True: yield input(prompt)
while True:
try:
for expr in parse(tokenize(readlines(']=> ' if sys_stdin_isatty else ''), sourcefile='<stdin>')):
res = evaluate(SExp.compile(expr))
print(f';{type(res).__name__}: {dump(res)}') if sys_stdin_isatty else print(dump(res))
except (EOFError, KeyboardInterrupt):
if sys_stdin_isatty: print()
break
except Exception as ex:
print(f';× {ex}' if isinstance(ex, EvalException) else f';× {type(ex).__name__}: {ex}')