3
0
Fork 0
mirror of https://github.com/YosysHQ/yosys synced 2025-04-07 01:54:10 +00:00
yosys/tests/rpc/frontend.py
whitequark 99a7f39084 rpc: new frontend.
A new pass, connect_rpc, allows any HDL frontend that can read/write
JSON from/to stdin/stdout or an unix socket or a named pipe to
participate in elaboration as a first class citizen, such that any
other HDL supported by Yosys directly or indirectly can transparently
instantiate modules handled by this frontend.

Recognizing that many HDL frontends emit Verilog, it allows the RPC
frontend to direct Yosys to process the result of instantiation via
any built-in Yosys frontend. The resulting RTLIL is then hygienically
integrated into the overall design.
2019-09-30 15:53:11 +00:00

127 lines
3.3 KiB
Python

def modules():
return ["python_inv"]
def derive(module, parameters):
assert module == r"python_inv"
if parameters.keys() != {r"\width"}:
raise ValueError("Invalid parameters")
return "ilang", r"""
module \impl
wire width {width:d} input 1 \i
wire width {width:d} output 2 \o
cell $neg $0
parameter \A_SIGNED 1'0
parameter \A_WIDTH 32'{width:b}
parameter \Y_WIDTH 32'{width:b}
connect \A \i
connect \Y \o
end
end
module \python_inv
wire width {width:d} input 1 \i
wire width {width:d} output 2 \o
cell \impl $0
connect \i \i
connect \o \o
end
end
""".format(width=parameters[r"\width"])
# ----------------------------------------------------------------------------
import json
import argparse
import sys, socket, os
try:
import msvcrt, win32pipe, win32file
except ImportError:
msvcrt = win32pipe = win32file = None
def map_parameter(parameter):
if parameter["type"] == "unsigned":
return int(parameter["value"], 2)
if parameter["type"] == "signed":
width = len(parameter["value"])
value = int(parameter["value"], 2)
if value & (1 << (width - 1)):
value = -((1 << width) - value)
return value
if parameter["type"] == "string":
return parameter["value"]
if parameter["type"] == "real":
return float(parameter["value"])
def call(input_json):
input = json.loads(input_json)
if input["method"] == "modules":
return json.dumps({"modules": modules()})
if input["method"] == "derive":
try:
frontend, source = derive(input["module"],
{name: map_parameter(value) for name, value in input["parameters"].items()})
return json.dumps({"frontend": frontend, "source": source})
except ValueError as e:
return json.dumps({"error": str(e)})
def main():
parser = argparse.ArgumentParser()
modes = parser.add_subparsers(dest="mode")
mode_stdio = modes.add_parser("stdio")
if os.name == "posix":
mode_path = modes.add_parser("unix-socket")
if os.name == "nt":
mode_path = modes.add_parser("named-pipe")
mode_path.add_argument("path")
args = parser.parse_args()
if args.mode == "stdio":
while True:
input = sys.stdin.readline()
if not input: break
sys.stdout.write(call(input) + "\n")
sys.stdout.flush()
if args.mode == "unix-socket":
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(args.path)
try:
sock.listen(1)
conn, addr = sock.accept()
file = conn.makefile("rw")
while True:
input = file.readline()
if not input: break
file.write(call(input) + "\n")
file.flush()
finally:
sock.close()
os.unlink(args.path)
if args.mode == "named-pipe":
pipe = win32pipe.CreateNamedPipe(args.path, win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_BYTE|win32pipe.PIPE_READMODE_BYTE|win32pipe.PIPE_WAIT,
1, 4096, 4096, 0, None)
win32pipe.ConnectNamedPipe(pipe, None)
try:
while True:
input = b""
while not input.endswith(b"\n"):
result, data = win32file.ReadFile(pipe, 4096)
assert result == 0
input += data
assert not b"\n" in input or input.endswith(b"\n")
output = (call(input.decode("utf-8")) + "\n").encode("utf-8")
length = len(output)
while length > 0:
result, done = win32file.WriteFile(pipe, output)
assert result == 0
length -= done
except win32file.error as e:
if e.args[0] == 109: # ERROR_BROKEN_PIPE
pass
else:
raise
if __name__ == "__main__":
main()