Skip to content

Commit

Permalink
doc: add comments and update quick guide
Browse files Browse the repository at this point in the history
Signed-off-by: Yu Fan <fany@buaa.edu.cn>
  • Loading branch information
FuryMartin committed Oct 28, 2024
1 parent 0c5f643 commit 520d514
Show file tree
Hide file tree
Showing 26 changed files with 1,082 additions and 225 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 The KubeEdge Authors.
# Copyright 2024 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,6 @@

"""Cloud-Edge Joint Inference"""

# Ianvs imports
import os
from tqdm import tqdm

Expand Down Expand Up @@ -43,8 +42,8 @@ class JointInference(ParadigmBase):
workspace: string
the output required for multi-edge inference paradigm.
kwargs: dict
config required for the test process of multi-edge inference paradigm,
e.g.: algorithm modules, dataset, initial model, etc.
config required for the test process of joint inference paradigm,
e.g.: hard_example_mining_mode
"""

Expand All @@ -58,12 +57,15 @@ def __init__(self, workspace, **kwargs):
)

def set_config(self):
"""Configure output_dir, dataset, modules
""" Set the configuration for the joint inference paradigm.
Raises:
KeyError: Required Modules are not fully loaded.
Raises
------
KeyError
If required modules are not provided.
"""


inference_output_dir = os.path.dirname(self.workspace)
os.environ["RESULT_SAVED_URL"] = inference_output_dir
os.makedirs(inference_output_dir, exist_ok=True)
Expand Down Expand Up @@ -98,12 +100,12 @@ def set_config(self):

def run(self):
"""
run the test flow of multi-edge inference paradigm.
run the test flow of joint inference paradigm.
Returns
------
test result: numpy.ndarray
system metric info: dict
inference_result: list
system_metric_info: dict
information needed to compute system metrics.
"""
Expand All @@ -118,21 +120,41 @@ def run(self):
return inference_result, self.system_metric_info

def _cleanup(self, job):
"""Call module's cleanup method to release resources
Parameters
----------
job : Sedna JointInference
Sedna JointInference API
"""

LOGGER.info("Release models")
# release module resources
for module in self.module_instances.values():
if hasattr(module, "cleanup"):
module.cleanup()

# Since the hard example mining module is instantiated within the job,
# special call is required.
# Special call is required for hard example mining module
# since it is instantiated within the job.
mining_instance = job.hard_example_mining_algorithm
if hasattr(mining_instance, "cleanup"):
mining_instance.cleanup()

del job

def _inference(self, job):
"""Inference each data in Inference Dataset
Parameters
----------
job : Sedna JointInference
Sedna JointInference API
Returns
-------
tuple
Inference Result with the format of `(is_hard_example, res, edge_result, cloud_result)`
"""
results = []

cloud_count, edge_count = 0,0
Expand Down Expand Up @@ -163,4 +185,4 @@ def _inference(self, job):

LOGGER.info("Inference Finished")

return results # (is_hard_example, res, edge_result, cloud_result)
return results
95 changes: 64 additions & 31 deletions examples/cloud-edge-collaborative-inference-for-llm/README.md

Large diffs are not rendered by default.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ benchmarkingjob:
# 1> "all": select all metrics in the leaderboard;
# 2> metrics in the leaderboard, e.g., "f1_score"
# metrics: [ "acc" , "edge-rate", "cloud-prompt", "cloud-completion", "edge-prompt", "edge-completion", "input-throughput", "output-throughput", "latency"]
metrics: ["Accuracy", "Rate to Edge", "Time to First Token", "Throughput", "Internal Token Latency", "Cloud Prompt Tokens", "Cloud Completion Tokens", "Edge Prompt Tokens", "Edge Completion Tokens"]
metrics: ["Accuracy", "Edge Ratio", "Time to First Token", "Throughput", "Internal Token Latency", "Cloud Prompt Tokens", "Cloud Completion Tokens", "Edge Prompt Tokens", "Edge Completion Tokens"]

# model of save selected and all dataitems in workspace; string type;
# currently the options of value are as follows:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import numpy as np

import matplotlib.pyplot as plt
from scipy.optimize import curve_fit

colors = plt.cm.Paired.colors # Set1 调色板
plt.rcParams["axes.prop_cycle"] = plt.cycler("color", colors)

# a sigmoid function to fit non-oracle models' performance vs cost
def sigmoid_fit(x, L, k, x0):
return L / (1 + np.exp(-k * (x - x0)))

def plot_accuracy_cost(models, costs, accuracy, non_oracle_costs, non_oracle_accuracy):
# Fit the sigmoid model
params_sigmoid, _ = curve_fit(sigmoid_fit, non_oracle_costs, non_oracle_accuracy, p0=[100, 1, 0.2])

# Generate points for the sigmoid fitted curve
curve_x_sigmoid = np.linspace(min(non_oracle_costs), max(non_oracle_costs), 100)
curve_y_sigmoid = sigmoid_fit(curve_x_sigmoid, *params_sigmoid)

plt.figure(figsize=(10, 6))

# Plot all models
for i in range(len(models)):
if "Oracle" in models[i]:
marker = '^' # Triangle marker for Oracle models
else:
marker = 'o' # Circle marker for non-Oracle models
plt.scatter(costs[i], accuracy[i], label=models[i], marker=marker)

# Plot the sigmoid fitted curve
plt.plot(curve_x_sigmoid, curve_y_sigmoid, 'gray', linestyle='dashed') # Gray dashed line for the curve

plt.title('Model Performance vs Cost')
plt.xlabel('Cost($/M token)')
plt.ylabel('Accuracy (%)')
plt.legend(title='Model Name')
plt.grid(True)
plt.savefig('model_performance_sigmoid_fitted_curve.png', dpi=300)
plt.show()

if __name__ == '__main__':
models = [
"Oracle-Qwen2.5-7b-instruct + gpt-4o-mini",
"Oracle-Qwen2.5-1.5b-instruct + gpt-4o-mini",
"Oracle-Qwen2.5-3b-instruct + gpt-4o-mini",
"gpt-4o-mini",
"Qwen2.5-7B-Instruct",
"Qwen2.5-3B-Instruct",
"Qwen2.5-1.5B-Instruct"
]
# The Oracle Routed Model's cost is an average weighted by the Edge Ratio between edge model costs and cloud model costs.
# The edge model’s cost is estimated based on its parameter size.
costs = [0.16, 0.18, 0.17, 0.60, 0.10, 0.08, 0.05]
accuracy = [84.22, 82.75, 82.22, 75.99, 71.84, 60.3, 58.35]

# Non Oracle Models: gpt-4o-mini, Qwen2.5-7B-Instruct, Qwen2.5-3B-Instruct, Qwen2.5-1.5B-Instruct
non_oracle_costs = costs[-4:] # Costs in $/M token
non_oracle_accuracy = accuracy[-4:] # Accuracies in %

plot_accuracy_cost(models, costs, accuracy, non_oracle_costs, non_oracle_accuracy)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 The KubeEdge Authors.
# Copyright 2024 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,27 +15,57 @@
from __future__ import absolute_import, division, print_function

import os

from core.common.log import LOGGER
from sedna.common.class_factory import ClassType, ClassFactory

from models import APIBasedLLM
device = "cuda" # the device to load the model onto


os.environ['BACKEND_TYPE'] = 'TORCH'

__all__ = ["BaseModel"]

@ClassFactory.register(ClassType.GENERAL, alias="CloudModel")
class CloudModel:
"""Models being deployed on the Cloud
"""
def __init__(self, **kwargs):
"""Initialize the CloudModel. See `APIBasedLLM` for details about `kwargs`.
"""
LOGGER.info(kwargs)
self.model = APIBasedLLM(**kwargs)
self.model.load(model = kwargs.get("model", "gpt-4o-mini"))

def inference(self, data, input_shape=None, **kwargs):
self.load(kwargs.get("model", "gpt-4o-mini"))

def load(self, model):
"""Set the model.
Parameters
----------
model : str
Existing model from your OpenAI provider. Example: `gpt-4o-mini`
"""
self.model._load(model = model)

def inference(self, data, **kwargs):
"""Inference the model with the given data.
Parameters
----------
data : dict
The data to be used for inference. See format at BaseLLM's `inference()`.
kwargs : dict
To Align with Sedna's JointInference interface.
Returns
-------
dict
Formatted Response. See `model._format_response()` for more details.
"""

return self.model.inference(data)

def cleanup(self):
"""Save the cache and cleanup the model.
"""

self.model.save_cache()
self.model.cleanup()
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
import numpy as np
# Copyright 2024 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from sedna.common.class_factory import ClassFactory, ClassType
from sedna.datasources import BaseDataSource

@ClassFactory.register(ClassType.GENERAL, alias="OracleRouterDatasetProcessor")
class OracleRouterDatasetProcessor:
""" A Customized Dataset Processor for Oracle Router"""
def __init__(self, **kwargs):
pass

def __call__(self, dataset):
"""Transform the dataset to another format for Oracle Router
Parameters
----------
dataset : sedna.datasources.BaseDataSource
The dataset loaded by Sedna
Returns
-------
sedna.datasources.BaseDataSource
Transformed dataset
"""
dataset.x = [{"query": x, "gold": y} for x,y in zip(dataset.x, dataset.y)]
return dataset
Loading

0 comments on commit 520d514

Please sign in to comment.