Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for generating bindings for token contracts. #8

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions stellar_contract_bindings/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import dataclasses
from typing import List, Optional, Tuple, Type, Union

import xdrlib3
from stellar_sdk import xdr


Expand Down Expand Up @@ -108,3 +109,17 @@ def parse_entries(
offset += len(entry.to_xdr_bytes())
entries.append(entry)
return entries


def get_token_sc_spec_entry() -> list[xdr.SCSpecEntry]:
"""Get the token contract spec entry."""

# A little bit hacky, but it works
# TODO: find a way to get the token contract spec entry in the repo
# https://github.com/stellar/stellar-cli/blob/a11a924d310c1602e7b579377daa3e373010ac0e/cmd/soroban-cli/src/get_spec.rs#L77
raw_xdr = ""
unpacker = xdrlib3.Unpacker(base64.b64decode(raw_xdr))
specs = []
while unpacker.get_position() < len(base64.b64decode(raw_xdr)):
specs.append(xdr.SCSpecEntry.unpack(unpacker))
return specs
103 changes: 79 additions & 24 deletions stellar_contract_bindings/python.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import keyword
import os
from typing import List

Expand All @@ -7,11 +8,7 @@
from stellar_sdk import xdr

from stellar_contract_bindings import __version__ as stellar_contract_bindings_version
from stellar_contract_bindings.metadata import parse_contract_metadata
from stellar_contract_bindings.utils import (
get_wasm_hash_by_contract_id,
get_contract_wasm_by_hash,
)
from stellar_contract_bindings.utils import get_specs_by_contract_id


def is_tuple_struct(entry: xdr.SCSpecUDTStructV0) -> bool:
Expand Down Expand Up @@ -300,7 +297,7 @@ def __init__(self, {% for field in entry.fields %}{{ field.name.decode() }}: {{
def to_scval(self) -> xdr.SCVal:
return scval.to_struct({
{%- for field in entry.fields %}
'{{ field.name.decode() }}': {{ to_scval(field.type, 'self.' ~ field.name.decode()) }}{% if not loop.last %},{% endif %}
'{{ field.name_r.decode() if field.name_r else field.name.decode() }}': {{ to_scval(field.type, 'self.' ~ field.name.decode()) }}{% if not loop.last %},{% endif %}
{%- endfor %}
})

Expand All @@ -309,7 +306,7 @@ def from_scval(cls, val: xdr.SCVal):
elements = scval.from_struct(val)
return cls(
{%- for index, field in enumerate(entry.fields) %}
{{ from_scval(field.type, 'elements["' ~ field.name.decode() ~ '"]') }}{% if not loop.last %},{% endif %}
{{ from_scval(field.type, 'elements["' ~ (field.name_r.decode() if field.name_r else field.name.decode()) ~ '"]') }}{% if not loop.last %},{% endif %}
{%- endfor %}
)

Expand Down Expand Up @@ -371,9 +368,9 @@ def render_union(entry: xdr.SCSpecUDTUnionV0):
class {{ entry.name.decode() }}Kind(Enum):
{%- for case in entry.cases %}
{%- if case.kind == xdr.SCSpecUDTUnionCaseV0Kind.SC_SPEC_UDT_UNION_CASE_VOID_V0 %}
{{ case.void_case.name.decode() }} = '{{ case.void_case.name.decode() }}'
{{ case.void_case.name.decode() }} = '{{ case.void_case.name_r.decode() if case.void_case.name_r else case.void_case.name.decode() }}'
{%- else %}
{{ case.tuple_case.name.decode() }} = '{{ case.tuple_case.name.decode() }}'
{{ case.tuple_case.name.decode() }} = '{{ case.tuple_case.name.decode() if case.tuple_case.name_r else case.tuple_case.name.decode() }}'
{%- endif %}
{%- endfor %}
"""
Expand Down Expand Up @@ -494,7 +491,7 @@ def {{ entry.name.sc_symbol.decode() }}(self, {% for param in entry.inputs %}{{
{%- if entry.doc %}
"""{{ entry.doc.decode() }}"""
{%- endif %}
return self.invoke('{{ entry.name.sc_symbol.decode() }}', [{% for param in entry.inputs %}{{ to_scval(param.type, param.name.decode()) }}{% if not loop.last %}, {% endif %}{% endfor %}], parse_result_xdr_fn={{ parse_result_xdr_fn(entry.outputs) }}, source = source, signer = signer, base_fee = base_fee, transaction_timeout = transaction_timeout, submit_timeout = submit_timeout, simulate = simulate, restore = restore)
return self.invoke('{{ entry.name.sc_symbol_r.decode() if entry.name.sc_symbol_r else entry.name.sc_symbol.decode() }}', [{% for param in entry.inputs %}{{ to_scval(param.type, param.name.decode()) }}{% if not loop.last %}, {% endif %}{% endfor %}], parse_result_xdr_fn={{ parse_result_xdr_fn(entry.outputs) }}, source = source, signer = signer, base_fee = base_fee, transaction_timeout = transaction_timeout, submit_timeout = submit_timeout, simulate = simulate, restore = restore)
{%- endfor %}
{%- endif %}

Expand All @@ -505,7 +502,7 @@ async def {{ entry.name.sc_symbol.decode() }}(self, {% for param in entry.inputs
{%- if entry.doc %}
"""{{ entry.doc.decode() }}"""
{%- endif %}
return await self.invoke('{{ entry.name.sc_symbol.decode() }}', [{% for param in entry.inputs %}{{ to_scval(param.type, param.name.decode()) }}{% if not loop.last %}, {% endif %}{% endfor %}], parse_result_xdr_fn={{ parse_result_xdr_fn(entry.outputs) }}, source = source, signer = signer, base_fee = base_fee, transaction_timeout = transaction_timeout, submit_timeout = submit_timeout, simulate = simulate, restore = restore)
return await self.invoke('{{ entry.name.sc_symbol_r.decode() if entry.name.sc_symbol_r else entry.name.sc_symbol.decode() }}', [{% for param in entry.inputs %}{{ to_scval(param.type, param.name.decode()) }}{% if not loop.last %}, {% endif %}{% endfor %}], parse_result_xdr_fn={{ parse_result_xdr_fn(entry.outputs) }}, source = source, signer = signer, base_fee = base_fee, transaction_timeout = transaction_timeout, submit_timeout = submit_timeout, simulate = simulate, restore = restore)
{%- endfor %}
{%- endif %}
'''
Expand Down Expand Up @@ -539,13 +536,70 @@ def parse_result_xdr_fn(output: List[xdr.SCSpecTypeDef]):
return client_rendered_code


def generate_binding(wasm: bytes, client_type: str) -> str:
generated = []
generated.append(render_info())
# append _ to keyword
def append_underscore(specs: List[xdr.SCSpecEntry]):
for spec in specs:
if spec.kind == xdr.SCSpecEntryKind.SC_SPEC_ENTRY_UDT_STRUCT_V0:
assert spec.udt_struct_v0 is not None
if keyword.iskeyword(spec.udt_struct_v0.name.decode()):
spec.udt_struct_v0.name_r = spec.udt_struct_v0.name # type: ignore[attr-defined]
spec.udt_struct_v0.name = spec.udt_struct_v0.name + b"_"
for field in spec.udt_struct_v0.fields:
if keyword.iskeyword(field.name.decode()):
field.name = field.name + b"_"
if spec.kind == xdr.SCSpecEntryKind.SC_SPEC_ENTRY_UDT_UNION_V0:
assert spec.udt_union_v0 is not None
if keyword.iskeyword(spec.udt_union_v0.name.decode()):
spec.udt_union_v0.name_r = spec.udt_union_v0.name # type: ignore[attr-defined]
spec.udt_union_v0.name = spec.udt_union_v0.name + b"_"
for union_case in spec.udt_union_v0.cases:
if (
union_case.kind
== xdr.SCSpecUDTUnionCaseV0Kind.SC_SPEC_UDT_UNION_CASE_TUPLE_V0
):
if keyword.iskeyword(union_case.tuple_case.name.decode()):
union_case.tuple_case.name_r = union_case.tuple_case.name # type: ignore[attr-defined]
union_case.tuple_case.name = union_case.tuple_case.name + b"_"
elif (
union_case.kind
== xdr.SCSpecUDTUnionCaseV0Kind.SC_SPEC_UDT_UNION_CASE_VOID_V0
):
if keyword.iskeyword(union_case.void_case.name.decode()):
union_case.void_case.name_r = union_case.void_case.name # type: ignore[attr-defined]
union_case.void_case.name = union_case.void_case.name + b"_"
else:
raise ValueError(f"Unsupported union case kind: {union_case.kind}")
if spec.kind == xdr.SCSpecEntryKind.SC_SPEC_ENTRY_FUNCTION_V0:
assert spec.function_v0 is not None
if keyword.iskeyword(spec.function_v0.name.sc_symbol.decode()):
spec.function_v0.name.sc_symbol_r = spec.function_v0.name.sc_symbol # type: ignore[attr-defined]
spec.function_v0.name.sc_symbol = spec.function_v0.name.sc_symbol + b"_"
for param in spec.function_v0.inputs:
if keyword.iskeyword(param.name.decode()):
param.name = param.name + b"_"
if spec.kind == xdr.SCSpecEntryKind.SC_SPEC_ENTRY_UDT_ENUM_V0:
assert spec.udt_enum_v0 is not None
if keyword.iskeyword(spec.udt_enum_v0.name.decode()):
spec.udt_enum_v0.name_r = spec.udt_enum_v0.name # type: ignore[attr-defined]
spec.udt_enum_v0.name = spec.udt_enum_v0.name + b"_"
for enum_case in spec.udt_enum_v0.cases:
if keyword.iskeyword(enum_case.name.decode()):
enum_case.name = enum_case.name + b"_"
if spec.kind == xdr.SCSpecEntryKind.SC_SPEC_ENTRY_UDT_ERROR_ENUM_V0:
assert spec.udt_error_enum_v0 is not None
if keyword.iskeyword(spec.udt_error_enum_v0.name.decode()):
spec.udt_error_enum_v0.name_r = spec.udt_error_enum_v0.name # type: ignore[attr-defined]
spec.udt_error_enum_v0.name = spec.udt_error_enum_v0.name + b"_"
for error_enum_case in spec.udt_error_enum_v0.cases:
if keyword.iskeyword(error_enum_case.name.decode()):
error_enum_case.name = error_enum_case.name + b"_"


metadata = parse_contract_metadata(wasm)
specs = metadata.spec
def generate_binding(specs: List[xdr.SCSpecEntry], client_type: str) -> str:
append_underscore(specs)

generated = []
generated.append(render_info())
generated.append(render_imports(client_type))

for spec in specs:
Expand Down Expand Up @@ -599,16 +653,13 @@ def command(contract_id: str, rpc_url: str, output: str, client_type: str):
if output is None:
output = os.getcwd()
try:
wasm_id = get_wasm_hash_by_contract_id(contract_id, rpc_url)
click.echo(f"Got wasm id: {wasm_id.hex()}")
wasm_code = get_contract_wasm_by_hash(wasm_id, rpc_url)
click.echo(f"Got wasm code")
specs = get_specs_by_contract_id(contract_id, rpc_url)
except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
click.echo(f"Get contract specs failed: {e}", err=True)
raise click.Abort()

click.echo("Generating Python bindings")
generated = generate_binding(wasm_code, client_type=client_type)
generated = generate_binding(specs, client_type=client_type)
if not os.path.exists(output):
os.makedirs(output)
output_path = os.path.join(output, "bindings.py")
Expand All @@ -621,8 +672,12 @@ def command(contract_id: str, rpc_url: str, output: str, client_type: str):


if __name__ == "__main__":
from stellar_contract_bindings.metadata import parse_contract_metadata

wasm_file = "/Users/overcat/repo/lightsail/stellar-contract-bindings/tests/contracts/target/wasm32-unknown-unknown/release/python.wasm"
with open(wasm_file, "rb") as f:
wasm = f.read()
generated_code = generate_binding(wasm, "both")
print(generated_code)

specs = parse_contract_metadata(wasm).spec
generated = generate_binding(specs, client_type="both")
print(generated)
Loading