Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Support for Starknet/Cairo challenge #1

Merged
merged 10 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/include
**/__pycache__
.DS_Store
dump.rdb
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.11.3
57 changes: 49 additions & 8 deletions paradigmctf.py/ctf_launchers/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import traceback
from dataclasses import dataclass
from typing import Callable, Dict, List
import requests
import json

import requests
from ctf_launchers.team_provider import TeamProvider
from ctf_launchers.utils import deploy, http_url_to_ws
from ctf_launchers.utils import deploy, deploy_cairo, http_url_to_ws
from ctf_server.types import (
CreateInstanceRequest,
DaemonInstanceArgs,
Expand All @@ -33,8 +35,9 @@ class Action:

class Launcher(abc.ABC):
def __init__(
self, project_location: str, provider: TeamProvider, actions: List[Action] = []
self, type: str, project_location: str, provider: TeamProvider, actions: List[Action] = []
):
self.type = type
self.project_location = project_location
self.__team_provider = provider

Expand Down Expand Up @@ -105,6 +108,7 @@ def launch_instance(self) -> int:
body = requests.post(
f"{ORCHESTRATOR_HOST}/instances",
json=CreateInstanceRequest(
type=self.type,
instance_id=self.get_instance_id(),
timeout=TIMEOUT,
anvil_instances=self.get_anvil_instances(),
Expand All @@ -117,7 +121,17 @@ def launch_instance(self) -> int:
user_data = body["data"]

print("deploying challenge...")
challenge_addr = self.deploy(user_data, self.mnemonic)

if self.type == "starknet":
web3 = get_privileged_web3(user_data, "main")

credentials = self.get_credentials(web3.provider.endpoint_uri)

challenge_addr = self.deploy_cairo(user_data, credentials)
priv_key = credentials[1][1]
else:
challenge_addr = self.deploy(user_data, self.mnemonic)
priv_key = get_player_account(self.mnemonic).key.hex()

self.update_metadata(
{"mnemonic": self.mnemonic, "challenge_address": challenge_addr}
Expand All @@ -127,19 +141,24 @@ def launch_instance(self) -> int:

print()
print(f"your private blockchain has been set up")
print(f"it will automatically terminate in {TIMEOUT} minutes")
print(f"it will automatically terminate in {round(TIMEOUT/60)} minutes")
print(f"---")
print(f"rpc endpoints:")
for id in user_data["anvil_instances"]:
print(f" - {PUBLIC_HOST}/{user_data['external_id']}/{id}")
print(f" - {PUBLIC_WEBSOCKET_HOST}/{user_data['external_id']}/{id}/ws")
print(
f" - {PUBLIC_WEBSOCKET_HOST}/{user_data['external_id']}/{id}/ws")

print(f"private key: {get_player_account(self.mnemonic).key.hex()}")
if self.type == "starknet":
print(f"player address: {credentials[1][0]}")
print(
f"private key: {priv_key}")
print(f"challenge contract: {challenge_addr}")
return 0

def kill_instance(self) -> int:
resp = requests.delete(f"{ORCHESTRATOR_HOST}/instances/${self.get_instance_id()}")
resp = requests.delete(
f"{ORCHESTRATOR_HOST}/instances/{self.get_instance_id()}")
body = resp.json()

print(body["message"])
Expand All @@ -149,8 +168,30 @@ def deploy(self, user_data: UserData, mnemonic: str) -> str:
web3 = get_privileged_web3(user_data, "main")

return deploy(
web3, self.project_location, mnemonic, env=self.get_deployment_args(user_data)
web3, self.project_location, mnemonic, env=self.get_deployment_args(
user_data)
)

def deploy_cairo(self, user_data: UserData, credentials: list) -> str:
web3 = get_privileged_web3(user_data, "main")

return deploy_cairo(web3, self.project_location, credentials, env=self.get_deployment_args(user_data))


def get_deployment_args(self, user_data: UserData) -> Dict[str, str]:
return {}

def get_credentials(self, url: str) -> list:
x = requests.get(url + '/predeployed_accounts')
data = json.loads(x.text)

system = []
player = []

system.append(data[0]['address'])
system.append(data[0]['private_key'])

player.append(data[1]['address'])
player.append(data[1]['private_key'])

return [system, player]
1 change: 1 addition & 0 deletions paradigmctf.py/ctf_launchers/pwn_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(
provider: TeamProvider = get_team_provider(),
):
super().__init__(
'ethereum',
project_location,
provider,
[
Expand Down
63 changes: 63 additions & 0 deletions paradigmctf.py/ctf_launchers/starknet_pwn_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import json
import requests
from ctf_launchers.launcher import Action, Launcher, ORCHESTRATOR_HOST, CHALLENGE
from ctf_launchers.team_provider import TeamProvider, get_team_provider
from ctf_server.types import UserData, get_privileged_web3

FLAG = os.getenv("FLAG", "PCTF{flag}")


class StarknetPwnChallengeLauncher(Launcher):
def __init__(
self,
project_location: str = "challenge/project",
provider: TeamProvider = get_team_provider(),
):
super().__init__(
'starknet',
project_location,
provider,
[
Action(name="get flag", handler=self.get_flag),
],
)

def get_flag(self) -> int:
instance_body = requests.get(
f"{ORCHESTRATOR_HOST}/instances/{self.get_instance_id()}").json()
if not instance_body['ok']:
print(instance_body['message'])
return 1

user_data = instance_body['data']

if not self.is_solved(
user_data, user_data['metadata']["challenge_address"]
):
print("are you sure you solved it?")
return 1

print(FLAG)
return 0

def is_solved(self, user_data: UserData, addr: str) -> bool:
web3 = get_privileged_web3(user_data, "main")

x = requests.post(web3.provider.endpoint_uri + "/rpc", json={
"id": 1,
"jsonrpc": "2.0",
"method": "starknet_call",
"params": [
{
"contract_address": addr,
"calldata": [],
"entry_point_selector": "0x1f8ddd388f265b0bcab25a3e457e789fe182bdf8ede59d9ef42b3158a533c8"
},
"latest"
]
})

solved = True if json.loads(x.text)['result'][0] == "0x0" else False

return solved
40 changes: 40 additions & 0 deletions paradigmctf.py/ctf_launchers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,46 @@ def deploy(
return result


def deploy_cairo(
web3: Web3,
project_location: str,
credentials: list,
deploy_script: str = "deploy.py",
env: Dict = {},
) -> str:
rfd, wfd = os.pipe2(os.O_NONBLOCK)

proc = subprocess.Popen(
args=[
"/usr/local/bin/python3",
deploy_script,
],
env={
"RPC_URL": web3.provider.endpoint_uri + "/rpc",
"PRIVATE_KEY": credentials[0][1],
"ACCOUNT_ADDRESS": credentials[0][0],
}
| env,
pass_fds=[wfd],
cwd=project_location,
text=True,
encoding="utf8",
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()

if proc.returncode != 0:
print(stdout)
print(stderr)
raise Exception("script failed to run")

output = stdout.split('address: ')[1].replace("\\n", "")

return output[:65]


def anvil_setCodeFromFile(
web3: Web3,
addr: str,
Expand Down
2 changes: 1 addition & 1 deletion paradigmctf.py/ctf_server/anvil_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .utils import load_database


ALLOWED_NAMESPACES = ["web3", "eth", "net"]
ALLOWED_NAMESPACES = ["web3", "eth", "net", "starknet"]
DISALLOWED_METHODS = [
"eth_sign",
"eth_signTransaction",
Expand Down
11 changes: 11 additions & 0 deletions paradigmctf.py/ctf_server/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import string
import time
from threading import Thread
import requests
import json

from ctf_server.databases.database import Database
from ctf_server.types import (
Expand All @@ -18,6 +20,7 @@
from eth_account import Account
from eth_account.hdaccount import key_from_seed, seed_from_mnemonic
from foundry.anvil import anvil_setBalance
from starknet.anvil import starknet_getVersion
from web3 import Web3


Expand Down Expand Up @@ -97,3 +100,11 @@ def _prepare_node(self, args: LaunchAnvilInstanceArgs, web3: Web3):
).address,
hex(int(args.get("balance", DEFAULT_BALANCE) * 10**18)),
)

def _prepare_node_starknet(self, args: LaunchAnvilInstanceArgs, web3: Web3):
while True:
try:
starknet_getVersion(web3)
break
except:
time.sleep(0.1)
84 changes: 55 additions & 29 deletions paradigmctf.py/ctf_server/backends/docker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shlex
import time
from typing import Dict, List
import requests

import docker
from ctf_server.databases.database import Database
Expand All @@ -12,6 +13,7 @@
InstanceInfo,
UserData,
format_anvil_args,
format_starknet_args
)
from docker.errors import APIError, NotFound
from docker.models.containers import Container
Expand All @@ -36,27 +38,43 @@ def _launch_instance_impl(self, request: CreateInstanceRequest) -> UserData:

anvil_containers: Dict[str, Container] = {}
for anvil_id, anvil_args in request["anvil_instances"].items():
anvil_containers[anvil_id] = self.__client.containers.run(
name=f"{instance_id}-{anvil_id}",
image=anvil_args.get("image", DEFAULT_IMAGE),
network="paradigmctf",
entrypoint=["sh", "-c"],
command=[
"while true; do anvil "
+ " ".join(
[
shlex.quote(str(v))
for v in format_anvil_args(anvil_args, anvil_id)
]
)
+ "; sleep 1; done;"
],
restart_policy={"Name": "always"},
detach=True,
mounts=[
Mount(target="/data", source=volume.id),
],
)
if request["type"] == "starknet":
anvil_containers[anvil_id] = self.__client.containers.run(
name=f"{instance_id}-{anvil_id}",
image=anvil_args.get("image", "shardlabs/starknet-devnet-rs"),
network="paradigmctf",
entrypoint=["tini", "--", "starknet-devnet"] + [
shlex.quote(str(v))
for v in format_starknet_args(anvil_args, anvil_id)
],
restart_policy={"Name": "always"},
detach=True,
mounts=[
Mount(target="/data", source=volume.id),
],
)
else:
anvil_containers[anvil_id] = self.__client.containers.run(
name=f"{instance_id}-{anvil_id}",
image=anvil_args.get("image", DEFAULT_IMAGE),
network="paradigmctf",
entrypoint=["sh", "-c"],
command=[
"while true; do anvil "
+ " ".join(
[
shlex.quote(str(v))
for v in format_anvil_args(anvil_args, anvil_id)
]
)
+ "; sleep 1; done;"
],
restart_policy={"Name": "always"},
detach=True,
mounts=[
Mount(target="/data", source=volume.id),
],
)

daemon_containers: Dict[str, Container] = {}
for daemon_id, daemon_args in request.get("daemon_instances", {}).items():
Expand All @@ -83,14 +101,22 @@ def _launch_instance_impl(self, request: CreateInstanceRequest) -> UserData:
"port": 8545,
}

self._prepare_node(
request["anvil_instances"][anvil_id],
Web3(
Web3.HTTPProvider(
f"http://{anvil_instances[anvil_id]['ip']}:{anvil_instances[anvil_id]['port']}"
)
),
)
url = f"http://{anvil_instances[anvil_id]['ip']}:{anvil_instances[anvil_id]['port']}"

if request["type"] == "starknet":
self._prepare_node_starknet(
request["anvil_instances"][anvil_id],
Web3(
Web3.HTTPProvider(url)
),
)
else:
self._prepare_node(
request["anvil_instances"][anvil_id],
Web3(
Web3.HTTPProvider(url)
),
)

daemon_instances = {}
for daemon_id, daemon_container in daemon_containers.items():
Expand Down
Loading
Loading