Skip to content

Commit

Permalink
Create topic before publishing prediction log (#556)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
Currently, if the Kafka brokers do not support auto topic creation,
PyfuncV3Model will fail to publish the prediction log due to missing
topic. This PR add a step to ensure that the topic is present prior to
publishing the message.

# Modifications
<!-- Summarize the key code changes. -->
Create topic during publisher initizliation.

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [ ] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
khorshuheng authored Mar 26, 2024
1 parent ca68bc2 commit 2c9eefa
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 16 deletions.
18 changes: 11 additions & 7 deletions api/cluster/resource/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ const (
defaultGRPCPort = 9000
defaultPredictorPort = 80

envPublisherKafkaTopic = "PUBLISHER_KAFKA_TOPIC"
envPublisherKafkaBrokers = "PUBLISHER_KAFKA_BROKERS"
envPublisherEnabled = "PUBLISHER_ENABLED"
envPublisherKafkaLinger = "PUBLISHER_KAFKA_LINGER_MS"
envPublisherKafkaAck = "PUBLISHER_KAFKA_ACKS"
envPublisherSamplingRatio = "PUBLISHER_SAMPLING_RATIO"
envPublisherKafkaConfig = "PUBLISHER_KAFKA_CONFIG"
envPublisherKafkaTopic = "PUBLISHER_KAFKA_TOPIC"
envPublisherNumPartitions = "PUBLISHER_KAFKA_NUM_PARTITIONS"
envPublisherReplicationFactor = "PUBLISHER_KAFKA_REPLICATION_FACTOR"
envPublisherKafkaBrokers = "PUBLISHER_KAFKA_BROKERS"
envPublisherEnabled = "PUBLISHER_ENABLED"
envPublisherKafkaLinger = "PUBLISHER_KAFKA_LINGER_MS"
envPublisherKafkaAck = "PUBLISHER_KAFKA_ACKS"
envPublisherSamplingRatio = "PUBLISHER_SAMPLING_RATIO"
envPublisherKafkaConfig = "PUBLISHER_KAFKA_CONFIG"

grpcHealthProbeCommand = "grpc_health_probe"
)
Expand Down Expand Up @@ -336,6 +338,8 @@ func (t *InferenceServiceTemplater) createPredictorSpec(modelService *models.Ser
pyfuncPublisherCfg := t.deploymentConfig.PyFuncPublisher
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherEnabled, Value: strconv.FormatBool(modelService.EnabledModelObservability)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaTopic, Value: modelService.GetPredictionLogTopicForVersion()})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherNumPartitions, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.NumPartitions)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherReplicationFactor, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.ReplicationFactor)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaBrokers, Value: pyfuncPublisherCfg.Kafka.Brokers})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaLinger, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.LingerMS)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaAck, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.Acks)})
Expand Down
19 changes: 14 additions & 5 deletions api/cluster/resource/templater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ var (

pyfuncPublisherConfig = config.PyFuncPublisherConfig{
Kafka: config.KafkaConfig{
Brokers: "kafka-broker:1111",
LingerMS: 1000,
Acks: 0,
AdditionalConfig: "{}",
Brokers: "kafka-broker:1111",
LingerMS: 1000,
Acks: 0,
NumPartitions: 24,
ReplicationFactor: 3,
AdditionalConfig: "{}",
},
SamplingRatioRate: 0.1,
}
Expand Down Expand Up @@ -1916,7 +1918,6 @@ func TestCreateInferenceServiceSpec(t *testing.T) {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Equal(t, tt.exp, infSvcSpec)
})
Expand Down Expand Up @@ -4428,6 +4429,14 @@ func createPyFuncPublisherEnvVars(svc *models.Service, pyfuncPublisher config.Py
Name: envPublisherKafkaTopic,
Value: svc.GetPredictionLogTopicForVersion(),
},
models.EnvVar{
Name: envPublisherNumPartitions,
Value: fmt.Sprintf("%d", pyfuncPublisher.Kafka.NumPartitions),
},
models.EnvVar{
Name: envPublisherReplicationFactor,
Value: fmt.Sprintf("%d", pyfuncPublisher.Kafka.ReplicationFactor),
},
models.EnvVar{
Name: envPublisherKafkaBrokers,
Value: pyfuncPublisher.Kafka.Brokers,
Expand Down
2 changes: 2 additions & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ type KafkaConfig struct {
ConnectTimeoutMS int `validate:"required" default:"1000"`
SerializationFmt string `validate:"required" default:"protobuf"`
LingerMS int `validate:"required" default:"100"`
NumPartitions int `validate:"required" default:"24"`
ReplicationFactor int `validate:"required" default:"3"`

AdditionalConfig string `validate:"required" default:"{}"`
}
Expand Down
8 changes: 7 additions & 1 deletion python/pyfunc-server/pyfuncserver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
PUBLISHER_KAFKA_BROKERS = ("PUBLISHER_KAFKA_BROKERS", "")
PUBLISHER_KAKFA_LINGER_MS = ("PUBLISHER_KAFKA_LINGER_MS", 1000)
PUBLISHER_KAFKA_ACKS = ("PUBLISHER_KAFKA_ACKS", 0)
PUBLISHER_KAFKA_NUM_PARTITIONS = ("PUBLISHER_KAFKA_NUM_PARTITIONS", 24)
PUBLISHER_KAFKA_REPLICATION_FACTOR = ("PUBLISHER_KAFKA_REPLICATION_FACTOR", 3)
PUBLISHER_KAFKA_CONFIG = ("PUBLISHER_KAFKA_CONFIG", "{}")
PUBLISHER_SAMPLING_RATIO = ("PUBLISHER_SAMPLING_RATIO", 0.01)
PUBLISHER_ENABLED = ("PUBLISHER_ENABLED", "false")
Expand Down Expand Up @@ -74,6 +76,8 @@ class Publisher:
sampling_ratio: float
enabled: bool
kafka: Kafka
num_partitions: int
replication_factor: int


class Config:
Expand Down Expand Up @@ -130,14 +134,16 @@ def __init__(self, model_dir: str):
raise ValueError("kafka brokers must be set")
kafka_linger_ms = int(os.getenv(*PUBLISHER_KAKFA_LINGER_MS))
kafka_acks = int(os.getenv(*PUBLISHER_KAFKA_ACKS))
num_partitions = int(os.getenv(*PUBLISHER_KAFKA_NUM_PARTITIONS))
replication_factor = int(os.getenv(*PUBLISHER_KAFKA_REPLICATION_FACTOR))
kafka_cfgs = self._kafka_config()
kafka = Kafka(
kafka_topic,
kafka_brokers,
kafka_linger_ms,
kafka_acks,
kafka_cfgs)
self.publisher = Publisher(sampling_ratio, publisher_enabled, kafka)
self.publisher = Publisher(sampling_ratio, publisher_enabled, kafka, num_partitions, replication_factor)


def _kafka_config(self):
Expand Down
17 changes: 14 additions & 3 deletions python/pyfunc-server/pyfuncserver/publisher/kafka.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import uuid

from pyfuncserver.config import Publisher as PublisherConfig, ModelManifest
from pyfuncserver.utils.converter import build_prediction_log

from confluent_kafka import Producer
from confluent_kafka import KafkaError, KafkaException, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from merlin.pyfunc import PyFuncOutput


Expand All @@ -18,6 +17,18 @@ def __init__(self, publisher_config: PublisherConfig, model_manifest: ModelManif
self.producer = Producer(**conf)
self.topic = publisher_config.kafka.topic
self.model_manifest = model_manifest
admin_client = AdminClient(conf)
try:
admin_client.create_topics([
NewTopic(
self.topic,
num_partitions=publisher_config.num_partitions,
replication_factor=publisher_config.replication_factor
)
])[self.topic].result()
except KafkaException as e:
if e.args[0].code() != KafkaError.TOPIC_ALREADY_EXISTS:
raise e

def produce(self, data: PyFuncOutput):
prediction_log = build_prediction_log(pyfunc_output=data, model_manifest=self.model_manifest)
Expand Down

0 comments on commit 2c9eefa

Please sign in to comment.