ruse-py/ruse.py
2019-11-01 07:15:30 -05:00

322 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import re
import json
import weakref
import random
from itertools import groupby
from collections import namedtuple
from collections import ChainMap
from types import GeneratorType
global_env = ChainMap()
def throw(ex, token=None, sourcefile=None, lineno=None, column=None):
raise type(ex)(f"{f'{sourcefile}:'.encode('unicode-escape').decode('utf-8') if (sourcefile := getattr(token, 'sourcefile', sourcefile)) else ''}{f'{lineno}:' if (lineno := getattr(token, 'lineno', lineno)) else ''}{f'{column}:' if (column := getattr(token, 'column', column)) else ''} {ex.args[0]}") if token or sourcefile or lineno or column else ex
Token = namedtuple('Token', ('str', 'type', 'sourcefile', 'lineno', 'column'))
Token.default = Token('', '', '<script>', None, None)
def tokenize(data, sourcefile=None, lineno=1, re_token=re.compile(r'((?P<comment>#|#[^\S\n].*?|;.*?)$|(?P<abbr>(?:#|\'|`|,@|,)(?=[\)\]\}\S]))|(?P<atom>"(?:\\.|[^"\\])*"|[^\s\(\)\[\]\{\}"\'`#;]+)|(?P<left>[\(\[\{])|(?P<right>[\)\]\}]))\s*', flags=re.MULTILINE), re_space=re.compile(r'\s+'), _token=None):
sourcefile = sourcefile or getattr(data, 'name', None) or '<script>'
for lineno, line in enumerate(data.splitlines() if isinstance(data, str) else data, start=lineno):
pos = len(m.group()) if (m := re_space.match(line)) else 0
while (m := re_token.match(line, pos)):
yield Token(str=(_token := m.group(1)), type=tuple(k for k, v in m.groupdict().items() if v is not None)[0], sourcefile=sourcefile, lineno=lineno, column=pos + 1)
pos = m.end()
else:
if len(line := re.sub(r'\s+', ' ', line[pos:].strip())) > 0:
throw(SyntaxError(f'invalid content after token {dump(_token)}: {line[:50]}' if _token else f'invalid content: {line[:50]}'), sourcefile=sourcefile, lineno=lineno, column=pos + 1)
parenmap = {'(': ')', '[': ']', '{': '}'}
def parse(tokens, cur=None, right=None):
for cur in tokens:
if cur.type == 'right':
if cur.str == right: return
throw(SyntaxError(f'expected closing parenthesis "{right}" instead of "{cur.str}"' if right else f'unexpected closing parenthesis "{cur.str}"'), token=cur)
elif cur.type == 'left': yield SExp(parse(tokens, cur=cur, right=parenmap[cur.str]), token=cur)
elif cur.type == 'atom': yield atom(cur.str)
elif cur.type == 'abbr': yield apply_abbr(cur.str, next(parse(tokens, cur=cur)))
right is None or throw(SyntaxError(f'expected closing parenthesis "{right}" after expression'), token=cur)
def atom(token):
try:
return json.loads(token)
except ValueError:
return Symbol(token)
class Symbol(str):
_registry = weakref.WeakValueDictionary()
def __new__(cls, symbol):
if symbol not in cls._registry:
ref = cls._registry[symbol] = super(Symbol, cls).__new__(cls, symbol) # assignment holds weakref
return cls._registry[symbol]
def __call__(self, envx=global_env):
return self if self.startswith(':') else envx[self] # :keyword symbols always evaluate to themselves
def __repr__(self):
return str(self)
def apply_abbr(abbr, datum, sym_quote=Symbol('quote'), sym_quasiquote=Symbol('quasiquote'), sym_unquote=Symbol('unquote'), sym_unquote_splicing=Symbol('unquote-splicing')):
if abbr == '#' and isinstance(datum, Symbol):
return datum if datum.startswith(':') else {'t': True, 'f': False, 'n': None}[datum]
elif abbr == '\'':
return SExp((sym_quote, datum), token=getattr(datum, 'token', Token.default))
elif abbr == '`':
return SExp((sym_quasiquote, datum), token=getattr(datum, 'token', Token.default))
elif abbr == ',':
return SExp((sym_unquote, datum), token=getattr(datum, 'token', Token.default))
elif abbr == ',@':
return SExp((sym_unquote_splicing, datum), token=getattr(datum, 'token', Token.default))
else:
raise NotImplementedError(f'{abbr}{datum}')
class SExp(tuple):
ops = {}
def __new__(cls, data, token=Token.default):
if isinstance(data, SExp) or not isinstance(data, (tuple, GeneratorType)):
return data
self = super(SExp, cls).__new__(cls, data)
self.token = token
self.left = token.str
self.right = parenmap.get(token.str)
return self
def __getitem__(self, idx):
return SExp((data := tuple.__getitem__(self, idx)), token=getattr(data[0], 'token', self.token) if len(data) else self.token) if isinstance(idx, slice) else tuple.__getitem__(self, idx)
def __repr__(self):
return f"{self.left or ''}{' '.join(dump(arg) for arg in self)}{self.right or ''}"
@classmethod
def implement(cls, *names):
return lambda func: (func, [cls.ops.__setitem__(Symbol(name) if isinstance(name, str) else name, func) for name in names])[0]
@classmethod
def compile(cls, expr, envp=global_env):
if not isinstance(expr, tuple): # constant => constant
return expr
if len(expr) < 1:
return None
if (op := expr[0]) in cls.ops:
res = cls.ops[op](expr, envp=envp) # allowmacro=allowmacro
elif isinstance(op, Symbol) and (op in envp) and isinstance(envp[op], Macro):
res = cls.compile(envp[op](*expr[1:]), envp=envp) # allowmacro=allowmacro
else:
res = cls.ops[None](expr, envp=envp) # allowmacro=allowmacro
if callable(res):
res.expr = expr
return res
@SExp.implement('quote') # (quote expr) => expr
def compile_quote(expr, envp=global_env): # (quote expr) => (quote expr)
len(expr) == 2 or throw(SyntaxError(f'expected 1 argument in {expr}'), token=expr.token)
return lambda envx=global_env: expr[1]
@SExp.implement('if') # (if test clause-true clause-false) => clause-true | clause-false
def compile_if(expr, envp=global_env):
len(expr) in {3, 4} or throw(SyntaxError(f'expected 2 or 3 arguments in {expr}'), token=expr.token)
xargs = tuple(SExp.compile(arg, envp=envp.new_child()) for arg in expr[1:])
return lambda envx=global_env: tailcall(runx=xargs[1] if evaluate(xargs[0], envx=envx) else xargs[2] if len(xargs) > 2 else None, envx=envx)
@SExp.implement('cond') # (cond (predicate value)*) => value
def compile_cond(expr, envp=global_env, sym_else=Symbol('else')):
all(isinstance(arg, SExp) and len(arg) == 2 for arg in expr[1:]) or throw(SyntaxError(f'expected (predicate value) pairs in {expr}'), token=expr.token)
xargs = tuple((SExp.compile(predicate, envp=envp.new_child()), SExp.compile(value, envp=envp.new_child())) for predicate, value in expr[1:])
def __call__(envx=global_env):
for predicate, value in xargs:
if predicate is sym_else or evaluate(predicate, envx):
return tailcall(runx=value, envx=envx)
return __call__
@SExp.implement('set!')
def compile_set(expr, envp=global_env):
len(expr) == 3 or throw(SyntaxError(f'expected 2 arguments in {expr}'), token=expr.token)
op, name, body = expr
isinstance(name, Symbol) or throw(SyntaxError(f'{expr} <-- could only set! a symbol'), token=expr.token)
xbody = SExp.compile(body, envp=envp.new_child())
return lambda envx=global_env: env_find(envx, name).__setitem__(name, evaluate(xbody, envx=envx)) # (set! name expr)
@SExp.implement('define', 'define-macro')
def compile_define(expr, envp=global_env, sym_define_macro=Symbol('define-macro'), sym_lambda=Symbol('lambda')):
len(expr) >= 3 or throw(SyntaxError(f'expected at least 2 arguments in {expr}'), token=expr.token)
op, name, *body = expr
if name and isinstance(name, SExp): # (define (name args*) body+) => (define name (lambda (args*) body+))
len(name) >= 1 or throw(SyntaxError(f'expected (define (name args*) body+) instead of {expr}'), token=expr.token)
return SExp.compile(SExp((op, name[0], SExp((sym_lambda, name[1:], *body), token=expr.token)), token=expr.token), envp=envp)
else:
len(expr) == 3 or throw(SyntaxError(f'expected 2 arguments in {expr}'), token=expr.token)
isinstance(name, Symbol) or throw(SyntaxError(f'could only define a symbol; got {expr} instead'), token=expr.token)
xvalue = SExp.compile(expr[2], envp=envp.new_child())
if op is sym_define_macro:
#allowmacro or throw(SyntaxError(f'{expr} <-- define-macro only allowed at top level'), token=expr.token)
value = xvalue(envx=envp.new_child()) # original.new_child()
isinstance(value, Closure) or throw(SyntaxError(f'macro must be a procedure; got {expr} instead'), token=expr.token)
value.__class__ = Macro
envp[name] = value # (define-macro name value)
return None
else:
return lambda envx=global_env: envx.__setitem__(name, evaluate(xvalue, envx=envx)) # (define name expr)
@SExp.implement('begin') # (begin exprs*)
def compile_begin(expr, envp=global_env):
xargs = tuple(SExp.compile(arg, envp=envp) for arg in expr[1:]) # allowmacro=allowmacro
def __call__(envx=global_env):
for arg in xargs[:-1]:
evaluate(arg, envx=envx)
return tailcall(runx=xargs[-1], envx=envx)
return __call__ if len(xargs) > 0 else None
@SExp.implement('lambda') # (lambda (args*) expr)
def compile_lambda(expr, envp=global_env, sym_begin=Symbol('begin')): # (lambda (args*) exprs+) => (lambda (args*) (begin exprs+))
len(expr) >= 3 or throw(SyntaxError(f'expected at least 2 arguments in {expr}'), token=expr.token)
op, names, *body = expr
isinstance(names, Symbol) or (isinstance(names, tuple) and all(isinstance(name, Symbol) for name in names)) or throw(SyntaxError(f'invalid argument list in {expr}'), token=expr.token)
xbody = SExp.compile(body[0] if len(body) == 1 else SExp((sym_begin, *body), token=expr.token), envp=envp.new_child())
return lambda envx=global_env: Closure(names, xbody, envx=envx)
@SExp.implement('let')
def compile_let(expr, envp=global_env, sym_lambda=Symbol('lambda')): # (let ((name value)*) exprs+) => ((lambda (names*) exprs+) values*)
len(expr) >= 3 or throw(SyntaxError(f'expected at least 2 arguments in {expr}'), token=expr.token)
op, bindings, *body = expr
all(isinstance(b, tuple) and len(b)==2 and isinstance(b[0], Symbol) for b in bindings) or throw(SyntaxError(f'invalid binding list in {expr}'), token=expr.token)
names, values = zip(*bindings) if len(bindings) > 0 else ((), ())
return SExp.compile(SExp((SExp((sym_lambda, names, *body), token=expr.token), *values), token=expr.token), envp=envp)
@SExp.implement('quasiquote')
def compile_quasiquote(expr, envp=global_env): # Expand `x => 'x; `,x => x; `(,@x y) => (append x y)
len(expr) == 2 or throw(SyntaxError(f'expected 1 argument in {expr}'), token=expr.token)
return SExp.compile(compile_quasiquote_expand(expr[1]), envp=envp)
def compile_quasiquote_expand(expr, sym_unquote=Symbol('unquote'), sym_unquote_splicing=Symbol('unquote-splicing'), sym_append=Symbol('append'), sym_cons=Symbol('cons'), sym_quote=Symbol('quote')):
if isinstance(expr, SExp) and len(expr) > 0:
(expr0 := expr[0]) is not sym_unquote_splicing or throw(SyntaxError(f'invalid comma splice {expr}'), token=expr.token)
if expr0 is sym_unquote:
len(expr) == 2 or throw(SyntaxError(f'expected 1 argument in {expr}'), token=expr.token)
return expr[1]
elif isinstance(expr0, SExp) and len(expr0) > 0 and expr0[0] is sym_unquote_splicing:
len(expr0) == 2 or throw(SyntaxError(f'expected 1 argument in {expr0}'), token=expr.token)
return SExp((sym_append, expr0[1], compile_quasiquote_expand(expr[1:])), token=getattr(expr0, 'token', expr.token))
else:
return SExp((sym_cons, compile_quasiquote_expand(expr0), compile_quasiquote_expand(expr[1:])), token=getattr(expr0, 'token', expr.token))
else:
return SExp((sym_quote, expr), token=getattr(expr, 'token', Token.default))
@SExp.implement(None)
def compile_call(expr, envp=global_env):
xargs = tuple(SExp.compile(arg, envp=envp.new_child()) for arg in expr)
def __call__(envx=global_env):
values = tuple(evaluate(arg, envx=envx) for arg in xargs)
return tailcall(runx=func.body, envx=func.child_env(values[1:])) if isinstance((func := values[0]), Closure) else SExp(func(*values[1:]), token=expr.token)
return __call__
class Closure(object):
def __init__(self, params, body, envx):
self.params, self.body, self.envx = params, body, envx
def __call__(self, *args):
return evaluate(self.body, envx=self.child_env(args))
def child_env(self, args):
return self.envx.new_child(dict(((self.params, args),) if isinstance(self.params, Symbol) else zip(self.params, args))) # original.new_child()
class Macro(Closure): pass
tailcall = namedtuple('tailcall', ('runx', 'envx'))
def func_gensym(base='g', start=1000000, stop=9999999):
while base + (r := str(random.randrange(start, stop))) in Symbol._registry: pass
return Symbol(base + r)
def func_call_with_current_continuation(proc):
ex = RuntimeWarning('Cannot continue this continuation anymore.')
def escape(value):
ex.value = value
raise ex
try: return proc(escape)
except RuntimeWarning as w:
if w is ex: return ex.value
else: raise w
def environment(env=global_env):
import math
env.update(vars(math))
env.update({
'assert=': lambda a, b: a == b or throw(AssertionError(f'{a} != {b}')),
'display': lambda *args: print(*args), 'log': lambda a: (a, print(a))[0],
'+': lambda *args: sum(args), '-': lambda a, *b: a - sum(b),
'*': lambda *args: math.prod(args), '/': lambda a, *b: a/math.prod(b),
'=': lambda a, b: a == b, '!=': lambda a, b: a != b,
'<': lambda a, b: a < b, '>': lambda a, b: a > b,
'<=': lambda a, b: a <= b, '>=': lambda a, b: a >= b,
'not': lambda a: not a,
'length': lambda a: len(a),
'cons': lambda a, b: (a,) + tuple(b), 'append': lambda *args: (i for obj in args for i in obj),
'list': lambda *args: args, 'list?': lambda a: isinstance(a, tuple), 'list-has': lambda obj, v: v in obj,
'list-ref': lambda obj, i: obj[i], 'list-tail': lambda obj, i: obj[i:],
'dict': lambda *args: dict(args), 'dict-has': lambda obj, k: k in obj, 'dict-get': lambda obj, k, d=None: obj.get(k, d), 'dict-add': lambda obj, k, v: (obj := obj.copy(), obj.__setitem__(k, v))[0],
'atom?': lambda a: not isinstance(a, tuple),
'null?': lambda a: a == (),
'call/cc': func_call_with_current_continuation,
'gensym': func_gensym
})
return env
global_env = environment(global_env)
def env_find(env, key):
for m in env.maps:
if key in m:
return m
raise KeyError(key)
class EvalException(RuntimeError): pass
def evaluate(runx, envx=global_env):
while callable(runx): # Symbol or Form
try:
res = runx(envx=envx)
except (RuntimeWarning, EvalException):
raise
except Exception as ex:
raise EvalException(f'{runx.expr.token.sourcefile.encode("unicode-escape").decode("utf-8")}:{runx.expr.token.lineno}:{runx.expr.token.column}: {ex.__class__.__name__}: {ex.args[0]}') if hasattr(runx, 'expr') else ex from ex
if isinstance(res, tailcall):
runx, envx = res # unwrap
else:
return res
else: # constant
return runx
def execute(script, envx=global_env, sourcefile=None, command=False, _result=None):
if command:
return evaluate(SExp.compile(SExp(parse(tokenize(script, sourcefile=sourcefile or '<command>'))), envp=envx), envx=envx)
for expr in parse(tokenize(script, sourcefile=sourcefile)):
_result = evaluate(SExp.compile(expr, envp=envx))
return _result
def load(script, sourcefile=None):
return SExp(parse(tokenize(script, sourcefile=sourcefile)))
def dump(expr):
return repr(expr) if isinstance(expr, (SExp, Symbol)) else (dump(expr.expr) if hasattr(expr, 'expr') else repr(expr)) if callable(expr) else json.dumps(expr)
__all__ = [tokenize, parse, evaluate, execute, load, dump, SExp]
if __name__ == '__main__':
import sys
try:
import readline
except ModuleNotFoundError:
pass
if (sys_stdin_isatty := sys.stdin.isatty()):
import platform
print('Ruse 0.0.1 (a little Scheme)', (x := 'Python ' + ' '.join(sys.version.split())), '='*len(x), sep='\n')
for fname in ['stdlib.scm'] + sys.argv[1:]:
with open(fname) as f: execute(f, envx=global_env)
def readlines(prompt=''):
while True: yield input(prompt)
while True:
try:
for expr in parse(tokenize(readlines(']=> ' if sys_stdin_isatty else ''), sourcefile='<stdin>')):
res = evaluate(SExp.compile(expr, envp=global_env), envx=global_env)
print(f';{type(res).__name__}: {dump(res)}') if sys_stdin_isatty else print(dump(res))
except (EOFError, KeyboardInterrupt):
if sys_stdin_isatty: print()
break
except Exception as ex:
print(f';× {ex}' if isinstance(ex, EvalException) else f';× {type(ex).__name__}: {ex}')