diff --git a/meta-lmp-base/recipes-support/lmp-el2go-auto-register/lmp-el2go-auto-register/lmp-el2go-auto-register b/meta-lmp-base/recipes-support/lmp-el2go-auto-register/lmp-el2go-auto-register/lmp-el2go-auto-register index 3347029314..608fefb9a5 100755 --- a/meta-lmp-base/recipes-support/lmp-el2go-auto-register/lmp-el2go-auto-register/lmp-el2go-auto-register +++ b/meta-lmp-base/recipes-support/lmp-el2go-auto-register/lmp-el2go-auto-register/lmp-el2go-auto-register @@ -4,6 +4,7 @@ # Copyright (c) 2022, Foundries.io Ltd. from contextlib import contextmanager +import re import os import logging import subprocess @@ -18,6 +19,7 @@ PIN = os.environ.get("PKCS11_PIN", "87654321") SO_PIN = os.environ.get("PKCS11_SOPIN", "12345678") SOTA_DIR = os.environ.get("SOTA_DIR", "/var/sota") HANDLERS = os.environ.get("HANDLERS", "aws-iot,aktualizr-lite") +TOKEN_LABEL = os.environ.get("TOKEN_LABEL", "aktualizr") REPO_ID = os.environ["REPOID"] @@ -46,7 +48,7 @@ class Pkcs11: "--key-type=EC:prime256v1", f"--id={slot}", f"--label={label}", - "--token-label=aktualizr", + f"--token-label={TOKEN_LABEL}", ] run_quietly(args) @@ -60,11 +62,12 @@ class Pkcs11: f"--type={type}", f"--id={slot}", f"--label={label}", + f"--token-label={TOKEN_LABEL}", ] run_quietly(args) def has_labels(self, labels: List[str]) -> bool: - args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", "--list-objects"] + args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", f"--token-label={TOKEN_LABEL}", "--list-objects"] out = subprocess.check_output(args) missing = {x: 1 for x in labels} for line in out.decode().splitlines(): @@ -76,20 +79,59 @@ class Pkcs11: pass # already removed return len(missing) == 0 + @classmethod + def _init_slot_dictionary(cls, pkcs11_output: str) -> dict: + slot_dict = {} + slot_id = None + r = re.compile("\((?P[0-9x]{3})\)") + + for line in pkcs11_output.splitlines(): + if line.startswith("Slot"): + # decode slot ID + # assume output in format: + # Slot 0 (0x0): 710be08c-e795-597d-bf30-2a98bab41050 + g = r.search(line) + slot_id = g.group("slot_id") + slot_dict.update({slot_id: {}}) + if line.startswith(" "): + # decode slot dictionary + # assume output in format: + # token label : testtoken + parts = line.split(":", 1) + slot_dict[slot_id].update({parts[0].strip(): parts[1].strip()}) + return slot_dict + @classmethod def _is_initialized(cls) -> bool: - args = ["pkcs11-tool", f"--module={cls.LIB}", f"--pin={PIN}", "--list-objects"] + args = ["pkcs11-tool", f"--module={cls.LIB}", "--list-slots"] try: - subprocess.check_output(args, stderr=subprocess.STDOUT) + out = subprocess.check_output(args, stderr=subprocess.STDOUT) + slot_dict = cls._init_slot_dictionary(out.decode()) + for slot_id, slot in slot_dict.items(): + token_label = slot.get("token label") + if token_label == f"{TOKEN_LABEL}": + return True except subprocess.CalledProcessError: return False - return True + return False @classmethod def _initialize(cls): + args = ["pkcs11-tool", f"--module={cls.LIB}", "--list-slots"] + out = subprocess.check_output(args, stderr=subprocess.STDOUT) + slot_dict = cls._init_slot_dictionary(out.decode()) + empty_slot = None + for slot_id, slot in slot_dict.items(): + token_state = slot.get("token state") + if token_state == "uninitialized": + empty_slot = slot_id + break + if empty_slot is None: + # raise exception as there are no more available slots + raise RuntimeError("No free PKCS11 slots") args = ["pkcs11-tool", f"--module={cls.LIB}", f"--so-pin={SO_PIN}"] - subprocess.check_call(args + ["--init-token", "--label", "aktualizr"]) - subprocess.check_call(args + ["--init-pin", f"--pin={PIN}"]) + subprocess.check_call(args + ["--init-token", "--label", f"{TOKEN_LABEL}", "--slot", f"{empty_slot}"]) + subprocess.check_call(args + ["--init-pin", f"--pin={PIN}", "--slot", f"{empty_slot}"]) @classmethod def get_initialized(cls) -> "Pkcs11":