Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: Cloud-edge collaborative inference for LLM based on KubeEdge-Ianvs #122

Merged
merged 5 commits into from
Aug 27, 2024

Conversation

FuryMartin
Copy link
Contributor

Cloud-edge collaborative inference for LLM based on KubeEdge-Ianvs proposal

What type of PR is this?
/kind design

What this PR does / why we need it:
The PR is a proposal to add cloud-edge collaborative inference algorithm for LLM to combine the advantages of high inference accuracy of cloud LLM with strong privacy and fast inference of edge LLM through the strategy of cloud edge collaboration, so as to better meet the needs of edge users.

Which issue(s) this PR fixes:
#96

Signed-off-by: Yu Fan <fany@buaa.edu.cn>
Signed-off-by: Yu Fan <fany@buaa.edu.cn>
Signed-off-by: Yu Fan <fany@buaa.edu.cn>
@hsj576
Copy link
Member

hsj576 commented Jul 18, 2024

The PR needs to add the design of how to implement this feature in Ianvs, especially to ensure that the new interface is consistent with the interface in Sedna.

Signed-off-by: Yu Fan <fany@buaa.edu.cn>
@FuryMartin
Copy link
Contributor Author

Thanks to @hsj576 and @MooreZheng for the review.

While trying to improve my proposal, I found that the integration of the new algorithm still needs to be discussed.

Basicly, there are two possible plans to integrate the cloud-edge collaborative strategy for LLMs.

  • Plan A will introduce Sedna's JointInference to ianvs/core, and modify it to support collaboration in LLM scenarios. The modified class can be contributed back to Sedna.
  • Plan B will leave the implementation of the collaborative strategy entirely up to users (i.e., implemented in examples/), with no specific details of Joint Inference included in Ianvs/core, making introducing Sedna's JointInference unnecessary.

In our last two meetings, we had some discussions about implementation details. From my personal understanding, @MooreZheng appears to prefer using Plan A to integrate Sedna's JointInference interface for alignment, while @hsj576’s initial idea aligns more with Plan B, which leaves the collaborative strategy to be implemented in examples/.

However, these two approaches differ significantly, and I would like to seek advice on which integration method should be adopted. If my understanding is incorrect, please feel free to point it out directly.

In the following part, I will present detailes for plan A and plan B, highlighting their respective advantages and disadvantages.

However, since we are designing an new algorithm, it's essential to have a thorough understanding of the code for both Sedna and Ianvs. I will first introduce the implementation logic and existing issues of JointInference in Sedna, followed by a review of the reasoning behind Ianvs introducing the new algorithm paradigm.

1 Sedna's JointInference

1.1 Implemetation Detail

The core class of the Joint Inference feature in Sedna is JointInference:

class JointInference(JobBase):

which mainly has an __init__ function and an inference function.

1.1.1 __init__(self, estimator=None, hard_example_mining: dict = None)

The constructor __init__ receives an estimator as the edge model instance and hard_example_mining as the configuration for the hard example mining algorithm. The specific logic is as follows.

First, get the local host, remote ip, and port by reading environment variables in below: Source Code

    def __init__(self, estimator=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
        self.job_kind = K8sResourceKind.JOINT_INFERENCE_SERVICE.value
        self.local_ip = get_host_ip()
        self.remote_ip = self.get_parameters(
            "BIG_MODEL_IP", self.local_ip)
        self.port = int(self.get_parameters("BIG_MODEL_PORT", "5000"))

Next, declare the local reporter to report inference progress: Source Code

        report_msg = {
            "name": self.worker_name,
            "namespace": self.config.namespace,
            "ownerName": self.job_name,
            "ownerKind": self.job_kind,
            "kind": "inference",
            "results": []
        }
        period_interval = int(self.get_parameters("LC_PERIOD", "30"))
        self.lc_reporter = LCReporter(lc_server=self.config.lc_server,
                                      message=report_msg,
                                      period_interval=period_interval)
        self.lc_reporter.setDaemon(True)
        self.lc_reporter.start()

Then, load the edge model: Source Code

        if callable(self.estimator):
            self.estimator = self.estimator()
        if not os.path.exists(self.model_path):
            raise FileExistsError(f"{self.model_path} miss")
        else:
            self.estimator.load(self.model_path)

Subsequently, Create a client for calling cloud model. Source Code

        self.cloud = ModelClient(service_name=self.job_name,
                                 host=self.remote_ip, port=self.port)

Finally, instantiate the hard example mining algorithm: Source Code

        self.hard_example_mining_algorithm = None
        if not hard_example_mining:
            hard_example_mining = self.get_hem_algorithm_from_config()
        if hard_example_mining:
            hem = hard_example_mining.get("method", "IBT")
            hem_parameters = hard_example_mining.get("param", {})
            self.hard_example_mining_algorithm = ClassFactory.get_cls(
                ClassType.HEM, hem
            )(**hem_parameters)

1.1.2 inference(self, data=None, post_process=None, **kwargs)

The inference function takes the data parameter as input and the post_process parameter as a post-processing function. The specific logic is as follows:

First, register post_process as callback_func, serving as a general post-processing function: Source Code

    def inference(self, data=None, post_process=None, **kwargs):
    	callback_func = None
        if callable(post_process):
            callback_func = post_process
        elif post_process is not None:
            callback_func = ClassFactory.get_cls(
                ClassType.CALLBACK, post_process)

Next, perform edge model inference and report: Source Code

        res = self.estimator.predict(data, **kwargs)
        edge_result = deepcopy(res)

        if callback_func:
            res = callback_func(res)

        self.lc_reporter.update_for_edge_inference()

Then, determine whether the example is hard or not: Source Code

        if self.hard_example_mining_algorithm:
            is_hard_example = self.hard_example_mining_algorithm(res)

If it is a hard example, call the cloud model for inference and report: Source Code

            if is_hard_example:
                try:
                    cloud_data = self.cloud.inference(
                        data.tolist(), post_process=post_process, **kwargs)
                    cloud_result = cloud_data["result"]
                except Exception as err:
                    self.log.error(f"get cloud result error: {err}")
                else:
                    res = cloud_result
                self.lc_reporter.update_for_collaboration_inference()

Finally, return the inference result: Source Code

        return [is_hard_example, res, edge_result, cloud_result]

1.2 Problems

1.2.1 Current Collaborative Strategy is hard-coded and not suitable for LLM.

As shown in the code above, this JointInference class essentially defines a certain collaborative method where edge inference is performed first, followed by hard example mining, and then a decision is made on whether cloud inference is needed.

This approach is suitable for edge-cloud collaboration for computer vision's Object Detection tasks. The example for JointInference provided by Sedna is Helmet Detection Inference , which uses IBT (image-box-thresholds) as the hard example mining algorithm. The general process includes:

  • Edge model inference to obtain object detection boxes;
  • Calculating hard coefficients based on the confidence of the detection boxes and a preset threshold threshold_box;
  • If the hard coefficient exceeds the threshold threshold_img, the example is considered hard and should be offloaded to the cloud for inference.

However, due to the auto-regressive generation pattern of LLMs, we cannot allow a hard example to be inferenced for several seconds on edge and then again for several seconds in the cloud, which will cause extremely bad latency performance.

The LLM collaborative algorithm we currently designed is query-routing, which requires first invoking a hard example mining algorithm for query difficulty assessment before inferring either in the cloud or at the edge.

Therefore, Sedna JointInference currently just has a hard-coded collaborative strategy suitable for the CV tasks, which cannot be directly used for collaborative inference for LLMs without modifications.

1.2.2 Cloud model's ModelClient has compatible issues with the current LLM API paradigm.

There is also an issue with the ModelClient used in Sedna's JointInference. As shown in the code above, Sedna's JointInference initializes a ModelClient defined by Sedna, as detailed in the following code:
Source Code

class ModelClient:
    """Remote model service"""

    def __init__(self, service_name, version="",
                 host="127.0.0.1", port="8080", protocol="http"):
        self.server_name = f"{service_name}{version}"
        self.endpoint = f"{protocol}://{host}:{port}/{service_name}"

    def check_server_status(self):
        return http_request(url=self.endpoint, method="GET")

    def inference(self, x, **kwargs):
        """Use the remote big model server to inference."""
        json_data = deepcopy(kwargs)
        json_data.update({"data": x})
        _url = f"{self.endpoint}/predict"
        return http_request(url=_url, method="POST", json=json_data)

The code shows that the API url and data format of the interface are hard-coded to align with the InferenceServer interface defined by Sedna, which means that the cloud model must be deployed using InferenceServer to complete JointInference.
Source Code

class InferenceServer(BaseServer):  # pylint: disable=too-many-arguments
    """
    rest api server for inference
    """

    def __init__(
            self,
            model,
            servername,
            host: str = '127.0.0.1',
            http_port: int = 8080,
            max_buffer_size: int = 104857600,
            workers: int = 1):
        super(
            InferenceServer,
            self).__init__(
            servername=servername,
            host=host,
            http_port=http_port,
            workers=workers)
        self.model = model
        self.max_buffer_size = max_buffer_size
        self.app = FastAPI(
            routes=[
                APIRoute(
                    f"/{servername}",
                    self.model_info,
                    response_model=ServeModelInfoResult,
                    response_class=JSONResponse,
                    methods=["GET"],
                ),
                APIRoute(
                    f"/{servername}/predict",
                    self.predict,
                    response_model=ServePredictResult,
                    response_class=JSONResponse,
                    methods=["POST"],
                ),
            ],
            log_level="trace",
            timeout=600,
        )

    def start(self):
        return self.run(self.app)

    def model_info(self):
        return ServeModelInfoResult(infos=self.get_all_urls())

    def predict(self, data: InferenceItem):
        inference_res = self.model.inference(
            data.data, post_process=data.callback)
        return ServePredictResult(result=inference_res)

However, this deployment method poses certain issues for LLMs:

  • On one hand, existing cloud-based LLMs are often too large to be deployed by users themselves;
  • On the other hand, the API defined by InferenceServer differs significantly from the OpenAI API, which has almost become a de facto standard for LLM APIs.

If users want to test cloud models like GPT-4o, Claude, Gemini, Moonshot-AI, they must rewrite a forwarding service using InferenceServer defined by Sedna, which is complicated (especially when considering features like streaming output) and unnecessary.

2 Ianvs algorithm's implemetation

In order to introduce a new algorithm for Ianvs, I need to briefly outline the implementation logic of the various algorithms currently in Ianvs.

In Ianvs, ianvs/benchmarking.py serves as the entry point of the program and constructs a BenchmarkJob:

ianvs/benchmarking.py

Lines 36 to 37 in 4de73b2

job = BenchmarkingJob(config[str.lower(BenchmarkingJob.__name__)])
job.run()

The specific logic of BenchmarkJob is defined in ianvs/cmd/obj/benchmarkingjob.py. We can see that the testcase is created by the build_testcases() method of testcasecontroller:

self.testcase_controller.build_testcases(test_env=self.test_env,
test_object=self.test_object)

build_testcases() constructs the algorithm through the _parse_algorithms_config() function:

def build_testcases(self, test_env, test_object):
"""
Build multiple test cases by Using a test environment and multiple test algorithms.
"""
test_object_type = test_object.get("type")
test_object_config = test_object.get(test_object_type)
if test_object_type == TestObjectType.ALGORITHMS.value:
algorithms = self._parse_algorithms_config(test_object_config)
for algorithm in algorithms:
self.test_cases.append(TestCase(test_env, algorithm))

The _parse_algorithms_config() function has a line indicating the instantiation of the Algorithm class:

algorithm = Algorithm(name, config)

In ianvs/core/testcasecontroller/algorithm/algorithm.py, the Algorithm class is declared, which will return various algorithm instances in paradigm function:

if self.paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value:
return SingleTaskLearning(workspace, **config)
if self.paradigm_type == ParadigmType.INCREMENTAL_LEARNING.value:
return IncrementalLearning(workspace, **config)
if self.paradigm_type == ParadigmType.MULTIEDGE_INFERENCE.value:
return MultiedgeInference(workspace, **config)
if self.paradigm_type == ParadigmType.LIFELONG_LEARNING.value:
return LifelongLearning(workspace, **config)

These algorithm instances are defined in the files located in various folders under ianvs/core/testcasecontroller/algorithm/paradigm. These algorithm classes inherit from the ParadigmBase class and are required to have a run() function, which serves as the main entry point for training and inference.

Taking IncrementalLearning as an example, it is defined by the IncrementalLearning class in ianvs/core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py.

For inference tasks, IncrementalLearning first uses the build_paradigm_job() function to construct a job, and then calls the job's inference() interface to complete the inference.

job = self.build_paradigm_job(ParadigmType.INCREMENTAL_LEARNING.value)
inference_dataset = self.dataset.load_data(data_index_file, "inference")
inference_dataset_x = inference_dataset.x
inference_results = {}
hard_examples = []
for _, data in enumerate(inference_dataset_x):
res, _, is_hard_example = job.inference([data])

build_paradigm_job() is defined in ianvs/core/testcasecontroller/algorithm/paradigm/base.py by ParadigmBase:

if paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value:
return self.module_instances.get(ModuleType.BASEMODEL.value)
if paradigm_type == ParadigmType.INCREMENTAL_LEARNING.value:
return IncrementalLearning(
estimator=self.module_instances.get(ModuleType.BASEMODEL.value),
hard_example_mining=self.module_instances.get(
ModuleType.HARD_EXAMPLE_MINING.value))
if paradigm_type == ParadigmType.LIFELONG_LEARNING.value:
return LifelongLearning(
estimator=self.module_instances.get(
ModuleType.BASEMODEL.value),
task_definition=self.module_instances.get(
ModuleType.TASK_DEFINITION.value),
task_relationship_discovery=self.module_instances.get(
ModuleType.TASK_RELATIONSHIP_DISCOVERY.value),
task_allocation=self.module_instances.get(
ModuleType.TASK_ALLOCATION.value),
task_remodeling=self.module_instances.get(
ModuleType.TASK_REMODELING.value),
inference_integrate=self.module_instances.get(
ModuleType.INFERENCE_INTEGRATE.value),
task_update_decision=self.module_instances.get(
ModuleType.TASK_UPDATE_DECISION.value),
unseen_task_allocation=self.module_instances.get(
ModuleType.UNSEEN_TASK_ALLOCATION.value),
unseen_sample_recognition=self.module_instances.get(
ModuleType.UNSEEN_SAMPLE_RECOGNITION.value),
unseen_sample_re_recognition=self.module_instances.get(
ModuleType.UNSEEN_SAMPLE_RE_RECOGNITION.value)
)
# pylint: disable=E1101
if paradigm_type == ParadigmType.MULTIEDGE_INFERENCE.value:
return self.modules_funcs.get(ModuleType.BASEMODEL.value)()
return None

It can be seen that the IncrementalLearning class calls the IncrementalLearning interface from sedna.core.incremental_learning when constructing tasks, and inference() is provided by this algorithm in sedna.

During the instantiation of the sedna IncrementalLearning class, two parameters, estimator and hard_example_mining, are passed in. These two parameters are registered and instantiated by the BaseModel class from basemodel.py and the class in hard_example_mining.py implemented in examples/pcb-aoi/incremental_learning_bench/fault_detection/testalgorithms/fpn.

@ClassFactory.register(ClassType.GENERAL, alias="FPN")
class BaseModel:

@ClassFactory.register(ClassType.HEM, alias="IBT")
class IBTFilter(BaseFilter, abc.ABC):

In the process of the IncrementalLearning class in Sedna, the inference of the estimator and hard example mining will be completed sequentially, just like in Sedna JointInference.

3 Two Plans to integrate JointInference to Ianvs

Due to the current implementation flaws of JointInference in Sedna, considering Ianvs's own logic for integrating new algorithms, there are two possible ways to introduce JointInference.

3.1 Plan A

In this approach, similar to the integration of Incremental Learning and LifeLongLearning, we will build jobs based on Sedna's JointInference class. Demo Code

        if paradigm_type == ParadigmType.JOINT_INFERENCE.value:
            return JointInference(
                estimator=self.module_instances.get(
                    ModuleType.BASEMODEL.value),
                cloud=self.module_instances.get(
                    ModuleType.APIMODEL.value),
                hard_example_mining=self.module_instances.get(
                    ModuleType.HARD_EXAMPLE_MINING.value)
            )

This method requires some modifications to Sedna JointInference to address the two problems we previously mentioned:

  • Introduce a new parameter cloud to the constructor of the JointInference class, allowing users to pass a self-designed model client instance. This resolves the issue where LLM currently needs to build unnecessary forwarding service. Since cloud is an optional parameter, it will not affect existed joint inference examples in Sedna. Demo Code
    def __init__(self, estimator=None, cloud=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
  • Introduce a parameter mining_mode for the inference() function of the JointInference class. Based on different mining modes, construct corresponding processes to address the issue of current collaborative processes not being compatible with LLMs. I have implemented a simple demo that builds two types, "inference-then-mining" for Object Detection collaborative strategy and "mining-then-inference" for LLM query routing strategy. Demo Code
        mining_mode = kwargs.get("mining_mode", "inference-then-mining")

        if mining_mode == "inference-then-mining":
            res, edge_result = self._get_edge_result(data, callback_func, **kwargs)
            
            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")
            
            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)

        elif mining_mode == "mining-then-inference":
            # First conduct hard example mining, and then decide whether to execute on the edge or in the cloud.
            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")

            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                if not sepeculative_decoding:
                    res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)
                else:
                    # do speculative_decoding
                    pass
            else:
                res, edge_result = self._get_edge_result(data, callback_func, **kwargs)
        
        else:
            raise ValueError(
                "Mining Mode must be in ['mining-then-inference', 'inference-then-mining']"
            )

        return [is_hard_example, res, edge_result, cloud_result]

3.1.1 Advantages

  • Introduce Sedna's JointInference feature to Ianvs core, and may also be contributed to Sedna to support LLM joint inference.

3.1.2 Drawbacks

  • The collaborative strategy will be hard-coded into the core. If users want to implement a new collaborative themselves, they will need to modify ianvs kernel (however, we can consider adding a custom module, similar to BASEMODEL and UNSEEN_TASK_ALLOCATION, allowing users to define it in examples).

3.2 Plan B

In addition to paradigms like Incremental Learning and Lifelong Learning that directly call Sedna's interfaces, Ianvs also includes paradigms such as Single Task Learning, which do not include any details about training and inference; those details are entirely handled by the BasedModel class found in the examples/ directory, which is as follows:

if paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value:
return self.module_instances.get(ModuleType.BASEMODEL.value)

In Plan B, we can introduce JointInference in a manner similar to SingleTaskLearning. It does not include any specific strategy for collaborative algorithms. The newly added JointInference paradigm serves merely as an entry point for calling a collaborative algorithm, and its code is almost identical to that of SingleTaskLearning but without training step. The specific collaborative strategy will be written in the predict() function of the BaseModel found in examples/.

3.2.1 Advantages

  • User-defined collaborative algorithms will be more convenient, allowing for direct customization of the collaboration process within the example.

3.2.2 Drawbacks

  • Since the newly added JointInference class in the paradigm does not contain any details of collaborative algorithms, whether it can be considered a new feature of Ianvs needs to be discussed;
  • Because the details of collaborative algorithms are entirely implemented by users, the introduction of Sedna's JointInference has become unnecessary.

Which plan should we adopt? If there are any other options, please feel free to suggest them.

@hsj576
Copy link
Member

hsj576 commented Aug 7, 2024

Thanks to @hsj576 and @MooreZheng for the review.

While trying to improve my proposal, I found that the integration of the new algorithm still needs to be discussed.

Basicly, there are two possible plans to integrate the cloud-edge collaborative strategy for LLMs.

  • Plan A will introduce Sedna's JointInference to ianvs/core, and modify it to support collaboration in LLM scenarios. The modified class can be contributed back to Sedna.
  • Plan B will leave the implementation of the collaborative strategy entirely up to users (i.e., implemented in examples/), with no specific details of Joint Inference included in Ianvs/core, making introducing Sedna's JointInference unnecessary.

In our last two meetings, we had some discussions about implementation details. From my personal understanding, @MooreZheng appears to prefer using Plan A to integrate Sedna's JointInference interface for alignment, while @hsj576’s initial idea aligns more with Plan B, which leaves the collaborative strategy to be implemented in examples/.

However, these two approaches differ significantly, and I would like to seek advice on which integration method should be adopted. If my understanding is incorrect, please feel free to point it out directly.

In the following part, I will present detailes for plan A and plan B, highlighting their respective advantages and disadvantages.

However, since we are designing an new algorithm, it's essential to have a thorough understanding of the code for both Sedna and Ianvs. I will first introduce the implementation logic and existing issues of JointInference in Sedna, followed by a review of the reasoning behind Ianvs introducing the new algorithm paradigm.

1 Sedna's JointInference

1.1 Implemetation Detail

The core class of the Joint Inference feature in Sedna is JointInference:

class JointInference(JobBase):

which mainly has an __init__ function and an inference function.

1.1.1 __init__(self, estimator=None, hard_example_mining: dict = None)

The constructor __init__ receives an estimator as the edge model instance and hard_example_mining as the configuration for the hard example mining algorithm. The specific logic is as follows.

First, get the local host, remote ip, and port by reading environment variables in below: Source Code

    def __init__(self, estimator=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
        self.job_kind = K8sResourceKind.JOINT_INFERENCE_SERVICE.value
        self.local_ip = get_host_ip()
        self.remote_ip = self.get_parameters(
            "BIG_MODEL_IP", self.local_ip)
        self.port = int(self.get_parameters("BIG_MODEL_PORT", "5000"))

Next, declare the local reporter to report inference progress: Source Code

        report_msg = {
            "name": self.worker_name,
            "namespace": self.config.namespace,
            "ownerName": self.job_name,
            "ownerKind": self.job_kind,
            "kind": "inference",
            "results": []
        }
        period_interval = int(self.get_parameters("LC_PERIOD", "30"))
        self.lc_reporter = LCReporter(lc_server=self.config.lc_server,
                                      message=report_msg,
                                      period_interval=period_interval)
        self.lc_reporter.setDaemon(True)
        self.lc_reporter.start()

Then, load the edge model: Source Code

        if callable(self.estimator):
            self.estimator = self.estimator()
        if not os.path.exists(self.model_path):
            raise FileExistsError(f"{self.model_path} miss")
        else:
            self.estimator.load(self.model_path)

Subsequently, Create a client for calling cloud model. Source Code

        self.cloud = ModelClient(service_name=self.job_name,
                                 host=self.remote_ip, port=self.port)

Finally, instantiate the hard example mining algorithm: Source Code

        self.hard_example_mining_algorithm = None
        if not hard_example_mining:
            hard_example_mining = self.get_hem_algorithm_from_config()
        if hard_example_mining:
            hem = hard_example_mining.get("method", "IBT")
            hem_parameters = hard_example_mining.get("param", {})
            self.hard_example_mining_algorithm = ClassFactory.get_cls(
                ClassType.HEM, hem
            )(**hem_parameters)

1.1.2 inference(self, data=None, post_process=None, **kwargs)

The inference function takes the data parameter as input and the post_process parameter as a post-processing function. The specific logic is as follows:

First, register post_process as callback_func, serving as a general post-processing function: Source Code

    def inference(self, data=None, post_process=None, **kwargs):
    	callback_func = None
        if callable(post_process):
            callback_func = post_process
        elif post_process is not None:
            callback_func = ClassFactory.get_cls(
                ClassType.CALLBACK, post_process)

Next, perform edge model inference and report: Source Code

        res = self.estimator.predict(data, **kwargs)
        edge_result = deepcopy(res)

        if callback_func:
            res = callback_func(res)

        self.lc_reporter.update_for_edge_inference()

Then, determine whether the example is hard or not: Source Code

        if self.hard_example_mining_algorithm:
            is_hard_example = self.hard_example_mining_algorithm(res)

If it is a hard example, call the cloud model for inference and report: Source Code

            if is_hard_example:
                try:
                    cloud_data = self.cloud.inference(
                        data.tolist(), post_process=post_process, **kwargs)
                    cloud_result = cloud_data["result"]
                except Exception as err:
                    self.log.error(f"get cloud result error: {err}")
                else:
                    res = cloud_result
                self.lc_reporter.update_for_collaboration_inference()

Finally, return the inference result: Source Code

        return [is_hard_example, res, edge_result, cloud_result]

1.2 Problems

1.2.1 Current Collaborative Strategy is hard-coded and not suitable for LLM.

As shown in the code above, this JointInference class essentially defines a certain collaborative method where edge inference is performed first, followed by hard example mining, and then a decision is made on whether cloud inference is needed.

This approach is suitable for edge-cloud collaboration for computer vision's Object Detection tasks. The example for JointInference provided by Sedna is Helmet Detection Inference , which uses IBT (image-box-thresholds) as the hard example mining algorithm. The general process includes:

  • Edge model inference to obtain object detection boxes;
  • Calculating hard coefficients based on the confidence of the detection boxes and a preset threshold threshold_box;
  • If the hard coefficient exceeds the threshold threshold_img, the example is considered hard and should be offloaded to the cloud for inference.

However, due to the auto-regressive generation pattern of LLMs, we cannot allow a hard example to be inferenced for several seconds on edge and then again for several seconds in the cloud, which will cause extremely bad latency performance.

The LLM collaborative algorithm we currently designed is query-routing, which requires first invoking a hard example mining algorithm for query difficulty assessment before inferring either in the cloud or at the edge.

Therefore, Sedna JointInference currently just has a hard-coded collaborative strategy suitable for the CV tasks, which cannot be directly used for collaborative inference for LLMs without modifications.

1.2.2 Cloud model's ModelClient has compatible issues with the current LLM API paradigm.

There is also an issue with the ModelClient used in Sedna's JointInference. As shown in the code above, Sedna's JointInference initializes a ModelClient defined by Sedna, as detailed in the following code: Source Code

class ModelClient:
    """Remote model service"""

    def __init__(self, service_name, version="",
                 host="127.0.0.1", port="8080", protocol="http"):
        self.server_name = f"{service_name}{version}"
        self.endpoint = f"{protocol}://{host}:{port}/{service_name}"

    def check_server_status(self):
        return http_request(url=self.endpoint, method="GET")

    def inference(self, x, **kwargs):
        """Use the remote big model server to inference."""
        json_data = deepcopy(kwargs)
        json_data.update({"data": x})
        _url = f"{self.endpoint}/predict"
        return http_request(url=_url, method="POST", json=json_data)

The code shows that the API url and data format of the interface are hard-coded to align with the InferenceServer interface defined by Sedna, which means that the cloud model must be deployed using InferenceServer to complete JointInference. Source Code

class InferenceServer(BaseServer):  # pylint: disable=too-many-arguments
    """
    rest api server for inference
    """

    def __init__(
            self,
            model,
            servername,
            host: str = '127.0.0.1',
            http_port: int = 8080,
            max_buffer_size: int = 104857600,
            workers: int = 1):
        super(
            InferenceServer,
            self).__init__(
            servername=servername,
            host=host,
            http_port=http_port,
            workers=workers)
        self.model = model
        self.max_buffer_size = max_buffer_size
        self.app = FastAPI(
            routes=[
                APIRoute(
                    f"/{servername}",
                    self.model_info,
                    response_model=ServeModelInfoResult,
                    response_class=JSONResponse,
                    methods=["GET"],
                ),
                APIRoute(
                    f"/{servername}/predict",
                    self.predict,
                    response_model=ServePredictResult,
                    response_class=JSONResponse,
                    methods=["POST"],
                ),
            ],
            log_level="trace",
            timeout=600,
        )

    def start(self):
        return self.run(self.app)

    def model_info(self):
        return ServeModelInfoResult(infos=self.get_all_urls())

    def predict(self, data: InferenceItem):
        inference_res = self.model.inference(
            data.data, post_process=data.callback)
        return ServePredictResult(result=inference_res)

However, this deployment method poses certain issues for LLMs:

  • On one hand, existing cloud-based LLMs are often too large to be deployed by users themselves;
  • On the other hand, the API defined by InferenceServer differs significantly from the OpenAI API, which has almost become a de facto standard for LLM APIs.

If users want to test cloud models like GPT-4o, Claude, Gemini, Moonshot-AI, they must rewrite a forwarding service using InferenceServer defined by Sedna, which is complicated (especially when considering features like streaming output) and unnecessary.

2 Ianvs algorithm's implemetation

In order to introduce a new algorithm for Ianvs, I need to briefly outline the implementation logic of the various algorithms currently in Ianvs.

In Ianvs, ianvs/benchmarking.py serves as the entry point of the program and constructs a BenchmarkJob:

ianvs/benchmarking.py

Lines 36 to 37 in 4de73b2

job = BenchmarkingJob(config[str.lower(BenchmarkingJob.__name__)])
job.run()

The specific logic of BenchmarkJob is defined in ianvs/cmd/obj/benchmarkingjob.py. We can see that the testcase is created by the build_testcases() method of testcasecontroller:

self.testcase_controller.build_testcases(test_env=self.test_env,
test_object=self.test_object)

build_testcases() constructs the algorithm through the _parse_algorithms_config() function:

def build_testcases(self, test_env, test_object):
"""
Build multiple test cases by Using a test environment and multiple test algorithms.
"""
test_object_type = test_object.get("type")
test_object_config = test_object.get(test_object_type)
if test_object_type == TestObjectType.ALGORITHMS.value:
algorithms = self._parse_algorithms_config(test_object_config)
for algorithm in algorithms:
self.test_cases.append(TestCase(test_env, algorithm))

The _parse_algorithms_config() function has a line indicating the instantiation of the Algorithm class:

algorithm = Algorithm(name, config)

In ianvs/core/testcasecontroller/algorithm/algorithm.py, the Algorithm class is declared, which will return various algorithm instances in paradigm function:

if self.paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value:
return SingleTaskLearning(workspace, **config)
if self.paradigm_type == ParadigmType.INCREMENTAL_LEARNING.value:
return IncrementalLearning(workspace, **config)
if self.paradigm_type == ParadigmType.MULTIEDGE_INFERENCE.value:
return MultiedgeInference(workspace, **config)
if self.paradigm_type == ParadigmType.LIFELONG_LEARNING.value:
return LifelongLearning(workspace, **config)

These algorithm instances are defined in the files located in various folders under ianvs/core/testcasecontroller/algorithm/paradigm. These algorithm classes inherit from the ParadigmBase class and are required to have a run() function, which serves as the main entry point for training and inference.

Taking IncrementalLearning as an example, it is defined by the IncrementalLearning class in ianvs/core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py.

For inference tasks, IncrementalLearning first uses the build_paradigm_job() function to construct a job, and then calls the job's inference() interface to complete the inference.

job = self.build_paradigm_job(ParadigmType.INCREMENTAL_LEARNING.value)
inference_dataset = self.dataset.load_data(data_index_file, "inference")
inference_dataset_x = inference_dataset.x
inference_results = {}
hard_examples = []
for _, data in enumerate(inference_dataset_x):
res, _, is_hard_example = job.inference([data])

build_paradigm_job() is defined in ianvs/core/testcasecontroller/algorithm/paradigm/base.py by ParadigmBase:

if paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value:
return self.module_instances.get(ModuleType.BASEMODEL.value)
if paradigm_type == ParadigmType.INCREMENTAL_LEARNING.value:
return IncrementalLearning(
estimator=self.module_instances.get(ModuleType.BASEMODEL.value),
hard_example_mining=self.module_instances.get(
ModuleType.HARD_EXAMPLE_MINING.value))
if paradigm_type == ParadigmType.LIFELONG_LEARNING.value:
return LifelongLearning(
estimator=self.module_instances.get(
ModuleType.BASEMODEL.value),
task_definition=self.module_instances.get(
ModuleType.TASK_DEFINITION.value),
task_relationship_discovery=self.module_instances.get(
ModuleType.TASK_RELATIONSHIP_DISCOVERY.value),
task_allocation=self.module_instances.get(
ModuleType.TASK_ALLOCATION.value),
task_remodeling=self.module_instances.get(
ModuleType.TASK_REMODELING.value),
inference_integrate=self.module_instances.get(
ModuleType.INFERENCE_INTEGRATE.value),
task_update_decision=self.module_instances.get(
ModuleType.TASK_UPDATE_DECISION.value),
unseen_task_allocation=self.module_instances.get(
ModuleType.UNSEEN_TASK_ALLOCATION.value),
unseen_sample_recognition=self.module_instances.get(
ModuleType.UNSEEN_SAMPLE_RECOGNITION.value),
unseen_sample_re_recognition=self.module_instances.get(
ModuleType.UNSEEN_SAMPLE_RE_RECOGNITION.value)
)
# pylint: disable=E1101
if paradigm_type == ParadigmType.MULTIEDGE_INFERENCE.value:
return self.modules_funcs.get(ModuleType.BASEMODEL.value)()
return None

It can be seen that the IncrementalLearning class calls the IncrementalLearning interface from sedna.core.incremental_learning when constructing tasks, and inference() is provided by this algorithm in sedna.

During the instantiation of the sedna IncrementalLearning class, two parameters, estimator and hard_example_mining, are passed in. These two parameters are registered and instantiated by the BaseModel class from basemodel.py and the class in hard_example_mining.py implemented in examples/pcb-aoi/incremental_learning_bench/fault_detection/testalgorithms/fpn.

@ClassFactory.register(ClassType.GENERAL, alias="FPN")
class BaseModel:

@ClassFactory.register(ClassType.HEM, alias="IBT")
class IBTFilter(BaseFilter, abc.ABC):

In the process of the IncrementalLearning class in Sedna, the inference of the estimator and hard example mining will be completed sequentially, just like in Sedna JointInference.

3 Two Plans to integrate JointInference to Ianvs

Due to the current implementation flaws of JointInference in Sedna, considering Ianvs's own logic for integrating new algorithms, there are two possible ways to introduce JointInference.

3.1 Plan A

In this approach, similar to the integration of Incremental Learning and LifeLongLearning, we will build jobs based on Sedna's JointInference class. Demo Code

        if paradigm_type == ParadigmType.JOINT_INFERENCE.value:
            return JointInference(
                estimator=self.module_instances.get(
                    ModuleType.BASEMODEL.value),
                cloud=self.module_instances.get(
                    ModuleType.APIMODEL.value),
                hard_example_mining=self.module_instances.get(
                    ModuleType.HARD_EXAMPLE_MINING.value)
            )

This method requires some modifications to Sedna JointInference to address the two problems we previously mentioned:

  • Introduce a new parameter cloud to the constructor of the JointInference class, allowing users to pass a self-designed model client instance. This resolves the issue where LLM currently needs to build unnecessary forwarding service. Since cloud is an optional parameter, it will not affect existed joint inference examples in Sedna. Demo Code
    def __init__(self, estimator=None, cloud=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
  • Introduce a parameter mining_mode for the inference() function of the JointInference class. Based on different mining modes, construct corresponding processes to address the issue of current collaborative processes not being compatible with LLMs. I have implemented a simple demo that builds two types, "inference-then-mining" for Object Detection collaborative strategy and "mining-then-inference" for LLM query routing strategy. Demo Code
        mining_mode = kwargs.get("mining_mode", "inference-then-mining")

        if mining_mode == "inference-then-mining":
            res, edge_result = self._get_edge_result(data, callback_func, **kwargs)
            
            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")
            
            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)

        elif mining_mode == "mining-then-inference":
            # First conduct hard example mining, and then decide whether to execute on the edge or in the cloud.
            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")

            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                if not sepeculative_decoding:
                    res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)
                else:
                    # do speculative_decoding
                    pass
            else:
                res, edge_result = self._get_edge_result(data, callback_func, **kwargs)
        
        else:
            raise ValueError(
                "Mining Mode must be in ['mining-then-inference', 'inference-then-mining']"
            )

        return [is_hard_example, res, edge_result, cloud_result]

3.1.1 Advantages

  • Introduce Sedna's JointInference feature to Ianvs core, and may also be contributed to Sedna to support LLM joint inference.

3.1.2 Drawbacks

  • The collaborative strategy will be hard-coded into the core. If users want to implement a new collaborative themselves, they will need to modify ianvs kernel (however, we can consider adding a custom module, similar to BASEMODEL and UNSEEN_TASK_ALLOCATION, allowing users to define it in examples).

3.2 Plan B

In addition to paradigms like Incremental Learning and Lifelong Learning that directly call Sedna's interfaces, Ianvs also includes paradigms such as Single Task Learning, which do not include any details about training and inference; those details are entirely handled by the BasedModel class found in the examples/ directory, which is as follows:

if paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value:
return self.module_instances.get(ModuleType.BASEMODEL.value)

In Plan B, we can introduce JointInference in a manner similar to SingleTaskLearning. It does not include any specific strategy for collaborative algorithms. The newly added JointInference paradigm serves merely as an entry point for calling a collaborative algorithm, and its code is almost identical to that of SingleTaskLearning but without training step. The specific collaborative strategy will be written in the predict() function of the BaseModel found in examples/.

3.2.1 Advantages

  • User-defined collaborative algorithms will be more convenient, allowing for direct customization of the collaboration process within the example.

3.2.2 Drawbacks

  • Since the newly added JointInference class in the paradigm does not contain any details of collaborative algorithms, whether it can be considered a new feature of Ianvs needs to be discussed;
  • Because the details of collaborative algorithms are entirely implemented by users, the introduction of Sedna's JointInference has become unnecessary.

Which plan should we adopt? If there are any other options, please feel free to suggest them.

I prefer plan A. The overall looks good to me.

… fix math typo

Signed-off-by: Yu Fan <fany@buaa.edu.cn>
@FuryMartin
Copy link
Contributor Author

Considering that Plan A aligns better with the Sedna interface and existing algorithms, I have updated the proposal based on this plan and created some draft code.

You can find more details by clicking these links: proposal and draft code.

@MooreZheng @hsj576

@hsj576
Copy link
Member

hsj576 commented Aug 7, 2024

/lgtm

Copy link
Collaborator

@MooreZheng MooreZheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design looks good to me already. A bland new feature of joint inference is added to ianvs.

It would be further appreciated if there are any thoughts about the previously mentioned "adding a custom module, similar to BASEMODEL and UNSEEN_TASK_ALLOCATION, allowing users to define it in examples".

@MooreZheng
Copy link
Collaborator

/lgtm

Copy link
Member

@hsj576 hsj576 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@kubeedge-bot kubeedge-bot added the lgtm Indicates that a PR is ready to be merged. label Aug 27, 2024
@kubeedge-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: hsj576, MooreZheng

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@kubeedge-bot kubeedge-bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Aug 27, 2024
@kubeedge-bot kubeedge-bot merged commit 2e60f64 into kubeedge:main Aug 27, 2024
10 of 13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. lgtm Indicates that a PR is ready to be merged.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants