Skip to content

Commit

Permalink
[hotfix] Fix namespaces in flink compaction filter
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Feb 22, 2024
1 parent 971d425 commit 96705d2
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions java/rocksjni/flink_compactionfilterjni.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include "rocksjni/jnicallback.h"
#include "utilities/flink/flink_compaction_filter.h"

using namespace ROCKSDB_NAMESPACE::flink;

class JniCallbackBase : public ROCKSDB_NAMESPACE::JniCallback {
public:
JniCallbackBase(JNIEnv* env, jobject jcallback_obj)
Expand Down Expand Up @@ -94,7 +92,8 @@ class JavaListElemenFilterFactory
assert(m_jcreate_filter_methodid != nullptr);
}

FlinkCompactionFilter::ListElementFilter* CreateListElementFilter(
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilter*
CreateListElementFilter(
std::shared_ptr<ROCKSDB_NAMESPACE::Logger> /*logger*/) const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
Expand Down Expand Up @@ -141,16 +140,16 @@ class JavaTimeProvider
jmethodID m_jcurrent_timestamp_methodid;
};

static FlinkCompactionFilter::ListElementFilterFactory*
createListElementFilterFactory(JNIEnv* env, jint ji_list_elem_len,
jobject jlist_filter_factory) {
FlinkCompactionFilter::ListElementFilterFactory* list_filter_factory =
nullptr;
static ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::
ListElementFilterFactory*
createListElementFilterFactory(JNIEnv* env, jint ji_list_elem_len,
jobject jlist_filter_factory) {
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilterFactory*
list_filter_factory = nullptr;
if (ji_list_elem_len > 0) {
auto fixed_size = static_cast<std::size_t>(ji_list_elem_len);
list_filter_factory =
new FlinkCompactionFilter::FixedListElementFilterFactory(
fixed_size, static_cast<std::size_t>(0));
list_filter_factory = new ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::
FixedListElementFilterFactory(fixed_size, static_cast<std::size_t>(0));
} else if (jlist_filter_factory != nullptr) {
list_filter_factory =
new JavaListElemenFilterFactory(env, jlist_filter_factory);
Expand All @@ -165,10 +164,10 @@ createListElementFilterFactory(JNIEnv* env, jint ji_list_elem_len,
*/
jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */) {
using namespace ROCKSDB_NAMESPACE::flink;
return reinterpret_cast<jlong>(
new std::shared_ptr<FlinkCompactionFilter::ConfigHolder>(
new FlinkCompactionFilter::ConfigHolder()));
new std::shared_ptr<
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ConfigHolder>(
new ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ConfigHolder()));
}

/*
Expand All @@ -178,10 +177,8 @@ jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilterConfi
*/
void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */, jlong handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto* config_holder =
reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
handle);
auto* config_holder = reinterpret_cast<std::shared_ptr<
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ConfigHolder>*>(handle);
delete config_holder;
}

Expand All @@ -193,9 +190,9 @@ void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHo
jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilter0(
JNIEnv* env, jclass /* jcls */, jlong config_holder_handle,
jobject jtime_provider, jlong logger_handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
*(reinterpret_cast<std::shared_ptr<
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ConfigHolder>*>(
config_holder_handle));
auto time_provider = new JavaTimeProvider(env, jtime_provider);
auto logger =
Expand All @@ -204,10 +201,13 @@ jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilter0(
: *(reinterpret_cast<
std::shared_ptr<ROCKSDB_NAMESPACE::LoggerJniCallback>*>(
logger_handle));
return reinterpret_cast<jlong>(new FlinkCompactionFilter(
config_holder,
std::unique_ptr<FlinkCompactionFilter::TimeProvider>(time_provider),
logger));
return reinterpret_cast<jlong>(
new ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter(
config_holder,
std::unique_ptr<
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::TimeProvider>(
time_provider),
logger));
}

/*
Expand All @@ -221,19 +221,21 @@ jboolean Java_org_rocksdb_FlinkCompactionFilter_configureFlinkCompactionFilter(
jlong jquery_time_after_num_entries, jint ji_list_elem_len,
jobject jlist_filter_factory) {
auto state_type =
static_cast<FlinkCompactionFilter::StateType>(ji_state_type);
static_cast<ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::StateType>(
ji_state_type);
auto timestamp_offset = static_cast<size_t>(ji_timestamp_offset);
auto ttl = static_cast<int64_t>(jl_ttl_milli);
auto query_time_after_num_entries =
static_cast<int64_t>(jquery_time_after_num_entries);
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
*(reinterpret_cast<std::shared_ptr<
ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ConfigHolder>*>(
handle));
auto list_filter_factory = createListElementFilterFactory(
env, ji_list_elem_len, jlist_filter_factory);
auto config = new FlinkCompactionFilter::Config{
auto config = new ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::Config{
state_type, timestamp_offset, ttl, query_time_after_num_entries,
std::unique_ptr<FlinkCompactionFilter::ListElementFilterFactory>(
list_filter_factory)};
std::unique_ptr<ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::
ListElementFilterFactory>(list_filter_factory)};
return static_cast<jboolean>(config_holder->Configure(config));
}
}

0 comments on commit 96705d2

Please sign in to comment.