Skip to content

Commit

Permalink
Remove extra logs
Browse files Browse the repository at this point in the history
  • Loading branch information
krishung5 committed Oct 3, 2024
1 parent 8b17fbd commit 95519a1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 61 deletions.
53 changes: 30 additions & 23 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,22 +719,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

// Handle two special cases:
// 1. For default(non-decoupled) mode, where the response
// factory should already be cleaned up with the previous response sent
// from response sender, and yet the model tries to return another
// response from `execute()` function. Notify the backend to NOT to

// If the response sender is already closed, notify the backend NOT to
// delete the response factory again during error handling.
// 2.The response sender is already closed, need to notify the backend to
// NOT to delete the response factory again during error handling.
// std::string error_string = pb_exception.what();
if ((err_message.find(
"Non-decoupled model cannot send more than one response") !=
std::string::npos) ||
(err_message.find("Response sender has been closed") !=
std::string::npos)) {
if (err_message.find("Response sender has been closed") !=
std::string::npos) {
response_batch_shm_ptr->is_response_factory_deleted = true;
LOG_ERROR << "=== caught error: " << err_message;
}

response_batch_shm_ptr->has_error = true;
Expand All @@ -752,8 +742,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}
} else {
if (!response_batch) {
// No response is returned from `execute()`.
std::cerr << "===== response_batch is not set" << std::endl;
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
Expand All @@ -764,8 +752,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;
std::cerr << "===== response_batch_shm_ptr->batch_size: "
<< response_batch_shm_ptr->batch_size << std::endl;
}

execute_response = IPCMessage::Create(
Expand Down Expand Up @@ -865,8 +851,32 @@ Stub::ProcessReturnedResponses(
}

InferResponse* response = py_responses[i].cast<InferResponse*>();
request->GetResponseSender()->UpdateStateAndCounters(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
try {
request->GetResponseSender()->UpdateStateAndCounters(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
catch (const PythonBackendException& pb_exception) {
// Handle the exception here to catch the error when there's a response
// returned from `execute()`, and the below error message is thrown.
// In default (non-decoupled) mode, the response factory should already
// have been cleaned up when the previous response was sent by the
// response sender. However, if the model attempts to return another
// response from the `execute()` function, notify the backend NOT to
// delete the response factory again during error handling.
std::string err_message = pb_exception.what();
if (err_message.find(
"Non-decoupled model cannot send more than one response") !=
std::string::npos) {
response_batch = std::move(shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm)));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
response_batch_shm_ptr->is_response_factory_deleted = true;
}
throw pb_exception;
}
}
}
// Return all the created responses using response_batch. The reason
Expand All @@ -883,18 +893,15 @@ Stub::ProcessReturnedResponses(
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.value().data_.get() + sizeof(ResponseBatch) +
sizeof(IPCMessageShm));
std::cerr << "===== response_size: " << responses_size << std::endl;
for (size_t i = 0; i < responses_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
if (!py::isinstance<py::none>(py_responses[i])) {
std::cerr << "===== response is NOT None" << std::endl;
infer_response->PruneOutputTensors(infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
} else {
std::cerr << "===== response is None" << std::endl;
responses_shm_handle[i] = 0;
}
}
Expand Down
34 changes: 1 addition & 33 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,6 @@ ModelInstanceState::ProcessCleanupRequest(
infer_payload_.erase(id);
} else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) {
// Delete response factory
std::cerr << "=== ResponseFactoryDeleter -> ProcessCleanupRequest ==="
<< std::endl;
std::unique_ptr<
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
response_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(id));
Expand Down Expand Up @@ -1165,8 +1163,6 @@ ModelInstanceState::ResponseSendDecoupled(
TRITONBACKEND_ResponseFactory* response_factory =
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
send_message_payload->response_factory_address);
std::cerr << "=== ResponseFactoryDeleter -> ResponseSendDecoupled ==="
<< std::endl;
std::unique_ptr<
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
lresponse_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
Expand Down Expand Up @@ -1366,20 +1362,11 @@ ModelInstanceState::ProcessRequests(
reporter.SetBatchStatistics(total_batch_size);

if (response_batch_shm_ptr->has_error) {
// The "is_response_factory_deleted" flag indicates whether the response
// factory has been deleted. The flag is used in a corner case
// where after the response sender sends a response and complete final flag,
// and closes the response factory, the model returns a response from
// `execute()`. For both default and decoupled mode, upon handling that
// error, no need to delete the response factory.
if (!response_batch_shm_ptr->is_response_factory_deleted) {
for (uint32_t r = 0; r < request_count; r++) {
TRITONBACKEND_ResponseFactory* response_factory =
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
pb_infer_requests[r]->GetResponseFactoryAddress());
std::cerr << "=== ResponseFactoryDeleter -> "
"response_batch_shm_ptr->has_error ==="
<< std::endl;
std::unique_ptr<
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
lresponse_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
Expand Down Expand Up @@ -1411,7 +1398,6 @@ ModelInstanceState::ProcessRequests(
// usage of response sender, so only create a TRITONBACKEND_Response
// object for the valid responses, and skip the None responses later.
if (response_shm_handle[i] == 0) {
std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl;
responses->emplace_back(nullptr);
} else {
TRITONBACKEND_Response* response;
Expand All @@ -1434,18 +1420,15 @@ ModelInstanceState::ProcessRequests(
gpu_output_buffers(request_count);
GPUBuffersHelper gpu_buffer_helper;

std::cerr << "=== PYBE request_count: " << request_count << std::endl;
for (uint32_t r = 0; r < request_count; ++r) {
NVTX_RANGE(nvtx_, "LoadingResponse " + Name());
requires_deferred_callback.push_back(false);
if (response_shm_handle[r] == 0) {
std::cerr << "=== PYBE skip the response_shm_handle is 0 ==="
<< std::endl;
continue;
}
TRITONBACKEND_Response* response = (*responses)[r];
TRITONBACKEND_Request* request = requests[r];
uint32_t requested_output_count = 0;
requires_deferred_callback.push_back(false);

shm_responses.emplace_back(nullptr);
std::unique_ptr<InferResponse>& infer_response = shm_responses.back();
Expand All @@ -1459,21 +1442,10 @@ ModelInstanceState::ProcessRequests(
(*responses)[r] = nullptr;
continue;
}

// if (response_shm_handle[r] == 0) {
// std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl;
// LOG_IF_ERROR(
// TRITONBACKEND_ResponseDelete((*responses)[r]),
// "failed to delete response");
// (*responses)[r] = nullptr;
// continue;
// }
{
TRITONBACKEND_ResponseFactory* response_factory =
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
pb_infer_requests[r]->GetResponseFactoryAddress());
std::cerr << "=== ResponseFactoryDeleter -> regular workflow ==="
<< std::endl;
std::unique_ptr<
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
lresponse_factory(
Expand Down Expand Up @@ -1522,17 +1494,13 @@ ModelInstanceState::ProcessRequests(
GUARDED_RESPOND_IF_ERROR(
responses, r,
TRITONBACKEND_RequestOutputCount(request, &requested_output_count));
std::cerr << "=== PYBE requested_output_count: " << requested_output_count
<< std::endl;
std::set<std::string> requested_output_names;
for (size_t j = 0; j < requested_output_count; ++j) {
const char* output_name;
GUARDED_RESPOND_IF_ERROR(
responses, r,
TRITONBACKEND_RequestOutputName(request, j, &output_name));
requested_output_names.insert(output_name);
std::cerr << "=== PYBE requested_output_name: " << output_name
<< std::endl;
}

bool require_deferred_callback = false;
Expand Down
5 changes: 0 additions & 5 deletions src/response_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ ResponseSender::UpdateStateAndCounters(
}

if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
std::cerr << "=== ResponseSender -> UpdateStateAndCounters closing RF ==="
<< std::endl;
response_factory_deleted_.exchange(true);
closed_ = true;
}
Expand Down Expand Up @@ -177,7 +175,6 @@ ResponseSender::Send(
bi::scoped_lock<bi::interprocess_mutex> guard{send_message_payload->mu};
// The server will destruct the response factory if the final flag is set.
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
std::cerr << "====== scoped_defer -> closing RF =====" << std::endl;
response_factory_deleted_.exchange(true);
}
stub->SendIPCUtilsMessage(ipc_message);
Expand Down Expand Up @@ -280,8 +277,6 @@ ResponseSender::Close()
void
ResponseSender::DeleteResponseFactory()
{
std::cerr << "=== ResponseSender -> DeleteResponseFactory, "
<< response_factory_deleted_ << " ===" << std::endl;
bool already_deleted = response_factory_deleted_.exchange(true);
if (!already_deleted) {
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
Expand Down

0 comments on commit 95519a1

Please sign in to comment.