diff --git a/gst_bridge/CMakeLists.txt b/gst_bridge/CMakeLists.txt index 88ba98c..1fa420c 100644 --- a/gst_bridge/CMakeLists.txt +++ b/gst_bridge/CMakeLists.txt @@ -22,6 +22,11 @@ if($ENV{CLION_IDE}) set(audio_msgs_DIR "${PROJECT_SOURCE_DIR}/../../../install/audio_msgs/share/audio_msgs/cmake") endif() +find_package(PkgConfig REQUIRED) + +# if(NOT TRACETOOLS_TRACEPOINTS_EXCLUDED) +pkg_search_module(lttng-ust REQUIRED IMPORTED_TARGET lttng-ust) +# endif() # find dependencies find_package(ament_cmake REQUIRED) @@ -31,35 +36,28 @@ find_package(rclcpp REQUIRED) list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") # find gstreamer -set(gstreamer_components app pbutils audio fft net) -find_package(GStreamer 1.8.3 REQUIRED COMPONENTS ${gstreamer_components}) +pkg_search_module(gstreamer REQUIRED IMPORTED_TARGET gstreamer-1.0) +pkg_search_module(gstreamer-base REQUIRED IMPORTED_TARGET gstreamer-base-1.0) +pkg_search_module(gstreamer-app REQUIRED IMPORTED_TARGET gstreamer-app-1.0) +pkg_search_module(gstreamer-audio REQUIRED IMPORTED_TARGET gstreamer-audio-1.0) +pkg_search_module(gstreamer-video REQUIRED IMPORTED_TARGET gstreamer-video-1.0) + # find glib set(glib_components gio gio-unix gobject gthread gmodule) -find_package(GLIB 2.28 REQUIRED COMPONENTS ${glib_components}) +find_package(GLIB REQUIRED COMPONENTS ${glib_components}) ## Include messages find_package(std_msgs REQUIRED) find_package(audio_msgs REQUIRED) find_package(sensor_msgs REQUIRED) find_package(builtin_interfaces REQUIRED) -# find_package(rosidl_default_generators REQUIRED) - -## Generate added messages and services with any dependencies listed here -# rosidl_generate_interfaces(${PROJECT_NAME} -# ${msg_files} -# DEPENDENCIES builtin_interfaces std_msgs -# ) - ########### ## Build ## ########### - - - # This is a gstreamer plugin, not a ros node # the install location needs to be found by gst-inspect @@ -74,11 +72,8 @@ add_library(rosgstbridge SHARED src/rosaudiosrc.cpp src/rosimagesrc.cpp src/rostextsrc.cpp - ) - - -#rosidl_target_interfaces(rosgstbridge -# ${PROJECT_NAME} "rosidl_typesupport_cpp") + src/gst_bridge_tpp.cpp +) # ament has a habit of pruning unused linked functions required by gstreamer # symptom of this is 'undefined symbol: gst_audio_sink_get_type' on gst-inspect @@ -87,12 +82,10 @@ target_include_directories(rosgstbridge PRIVATE ${sensor_msgs_INCLUDE_DIRS} ${audio_msgs_INCLUDE_DIRS} $ + $ $ ${GLIB_INCLUDE_DIRS} ${GOBJECT_INCLUDE_DIR} - ${GSTREAMER_INCLUDE_DIRS} - ${GSTREAMER_BASE_INCLUDE_DIRS} - ${GST_INCLUDE_DIRS} ) target_link_libraries(rosgstbridge PUBLIC @@ -102,12 +95,12 @@ target_link_libraries(rosgstbridge PUBLIC ${GLIB_LIBRARIES} ${GLIB_GIO_LIBRARIES} ${GLIB_GOBJECT_LIBRARIES} - ${GSTREAMER_LIBRARIES} - ${GSTREAMER_AUDIO_LIBRARIES} - ${GSTREAMER_APP_LIBRARIES} - ${GSTREAMER_VIDEO_LIBRARIES} - ${GSTREAMER_BASE_LIBRARIES} - ${GST_LIBRARIES} + PkgConfig::gstreamer + PkgConfig::gstreamer-app + PkgConfig::gstreamer-audio + PkgConfig::gstreamer-base + PkgConfig::gstreamer-video + PkgConfig::lttng-ust ) @@ -123,9 +116,6 @@ target_include_directories(gst_bridge PUBLIC $ ${GLIB_INCLUDE_DIRS} ${GOBJECT_INCLUDE_DIR} - ${GSTREAMER_INCLUDE_DIRS} - ${GSTREAMER_BASE_INCLUDE_DIRS} - ${GST_INCLUDE_DIRS} ) target_link_libraries(gst_bridge PUBLIC ${rclcpp_LIBRARIES} @@ -133,17 +123,15 @@ target_link_libraries(gst_bridge PUBLIC ${audio_msgs_LIBRARIES} ${GLIB_LIBRARIES} ${GLIB_GIO_LIBRARIES} - ${GSTREAMER_LIBRARIES} - ${GSTREAMER_AUDIO_LIBRARIES} - ${GSTREAMER_APP_LIBRARIES} - ${GSTREAMER_VIDEO_LIBRARIES} - ${GSTREAMER_BASE_LIBRARIES} - ${GST_LIBRARIES} + PkgConfig::gstreamer + PkgConfig::gstreamer-app + PkgConfig::gstreamer-audio + PkgConfig::gstreamer-base + PkgConfig::gstreamer-video ) ament_export_include_directories(include) -#ament_export_libraries(gst_bridge src/gst_bridge.cpp) ament_export_libraries(gst_bridge) install(DIRECTORY include/ diff --git a/gst_bridge/cmake/FindGStreamer.cmake b/gst_bridge/cmake/FindGStreamer.cmake deleted file mode 100644 index b508efe..0000000 --- a/gst_bridge/cmake/FindGStreamer.cmake +++ /dev/null @@ -1,135 +0,0 @@ -# - Try to find GStreamer and its plugins -# Once done, this will define -# -# GSTREAMER_FOUND - system has GStreamer -# GSTREAMER_INCLUDE_DIRS - the GStreamer include directories -# GSTREAMER_LIBRARIES - link these to use GStreamer -# -# Additionally, gstreamer-base is always looked for and required, and -# the following related variables are defined: -# -# GSTREAMER_BASE_INCLUDE_DIRS - gstreamer-base's include directory -# GSTREAMER_BASE_LIBRARIES - link to these to use gstreamer-base -# -# Optionally, the COMPONENTS keyword can be passed to find_package() -# and GStreamer plugins can be looked for. Currently, the following -# plugins can be searched, and they define the following variables if -# found: -# -# gstreamer-app: GSTREAMER_APP_INCLUDE_DIRS and GSTREAMER_APP_LIBRARIES -# gstreamer-audio: GSTREAMER_AUDIO_INCLUDE_DIRS and GSTREAMER_AUDIO_LIBRARIES -# gstreamer-fft: GSTREAMER_FFT_INCLUDE_DIRS and GSTREAMER_FFT_LIBRARIES -# gstreamer-gl: GSTREAMER_GL_INCLUDE_DIRS and GSTREAMER_GL_LIBRARIES -# gstreamer-mpegts: GSTREAMER_MPEGTS_INCLUDE_DIRS and GSTREAMER_MPEGTS_LIBRARIES -# gstreamer-pbutils: GSTREAMER_PBUTILS_INCLUDE_DIRS and GSTREAMER_PBUTILS_LIBRARIES -# gstreamer-tag: GSTREAMER_TAG_INCLUDE_DIRS and GSTREAMER_TAG_LIBRARIES -# gstreamer-video: GSTREAMER_VIDEO_INCLUDE_DIRS and GSTREAMER_VIDEO_LIBRARIES -# gstreamer-codecparser:GSTREAMER_CODECPARSERS_INCLUDE_DIRS and GSTREAMER_CODECPARSERS_LIBRARIES -# -# Copyright (C) 2012 Raphael Kubo da Costa -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND ITS CONTRIBUTORS ``AS -# IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR ITS -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; -# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -find_package(PkgConfig QUIET) - -# Helper macro to find a GStreamer plugin (or GStreamer itself) -# _component_prefix is prepended to the _INCLUDE_DIRS and _LIBRARIES variables (eg. "GSTREAMER_AUDIO") -# _pkgconfig_name is the component's pkg-config name (eg. "gstreamer-1.0", or "gstreamer-video-1.0"). -# _library is the component's library name (eg. "gstreamer-1.0" or "gstvideo-1.0") -macro(FIND_GSTREAMER_COMPONENT _component_prefix _pkgconfig_name _library) - - string(REGEX MATCH "(.*)>=(.*)" _dummy "${_pkgconfig_name}") - if ("${CMAKE_MATCH_2}" STREQUAL "") - pkg_check_modules(PC_${_component_prefix} "${_pkgconfig_name} >= ${GStreamer_FIND_VERSION}") - else () - pkg_check_modules(PC_${_component_prefix} ${_pkgconfig_name}) - endif () - set(${_component_prefix}_INCLUDE_DIRS ${PC_${_component_prefix}_INCLUDE_DIRS}) - - find_library(${_component_prefix}_LIBRARIES - NAMES ${_library} - HINTS ${PC_${_component_prefix}_LIBRARY_DIRS} ${PC_${_component_prefix}_LIBDIR} - ) -endmacro() - -# ------------------------ -# 1. Find GStreamer itself -# ------------------------ - -# 1.1. Find headers and libraries -FIND_GSTREAMER_COMPONENT(GSTREAMER gstreamer-1.0 gstreamer-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_BASE gstreamer-base-1.0 gstbase-1.0) - -# ------------------------- -# 2. Find GStreamer plugins -# ------------------------- - -FIND_GSTREAMER_COMPONENT(GSTREAMER_APP gstreamer-app-1.0 gstapp-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_AUDIO gstreamer-audio-1.0 gstaudio-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_FFT gstreamer-fft-1.0 gstfft-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_GL gstreamer-gl-1.0 gstgl-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_MPEGTS gstreamer-mpegts-1.0>=1.4.0 gstmpegts-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_PBUTILS gstreamer-pbutils-1.0 gstpbutils-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_TAG gstreamer-tag-1.0 gsttag-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_VIDEO gstreamer-video-1.0 gstvideo-1.0) -FIND_GSTREAMER_COMPONENT(GSTREAMER_CODECPARSERS gstreamer-codecparsers-1.0 gstcodecparsers-1.0) - -# ------------------------------------------------ -# 3. Process the COMPONENTS passed to FIND_PACKAGE -# ------------------------------------------------ -set(_GSTREAMER_REQUIRED_VARS GSTREAMER_INCLUDE_DIRS GSTREAMER_LIBRARIES GSTREAMER_VERSION GSTREAMER_BASE_INCLUDE_DIRS GSTREAMER_BASE_LIBRARIES) - -foreach (_component ${GStreamer_FIND_COMPONENTS}) - set(_gst_component "GSTREAMER_${_component}") - string(TOUPPER ${_gst_component} _UPPER_NAME) - - list(APPEND _GSTREAMER_REQUIRED_VARS ${_UPPER_NAME}_INCLUDE_DIRS ${_UPPER_NAME}_LIBRARIES) -endforeach () - -include(FindPackageHandleStandardArgs) -FIND_PACKAGE_HANDLE_STANDARD_ARGS(GStreamer REQUIRED_VARS _GSTREAMER_REQUIRED_VARS - VERSION_VAR GSTREAMER_VERSION) - -mark_as_advanced( - GSTREAMER_APP_INCLUDE_DIRS - GSTREAMER_APP_LIBRARIES - GSTREAMER_AUDIO_INCLUDE_DIRS - GSTREAMER_AUDIO_LIBRARIES - GSTREAMER_BASE_INCLUDE_DIRS - GSTREAMER_BASE_LIBRARIES - GSTREAMER_FFT_INCLUDE_DIRS - GSTREAMER_FFT_LIBRARIES - GSTREAMER_GL_INCLUDE_DIRS - GSTREAMER_GL_LIBRARIES - GSTREAMER_INCLUDE_DIRS - GSTREAMER_LIBRARIES - GSTREAMER_MPEGTS_INCLUDE_DIRS - GSTREAMER_MPEGTS_LIBRARIES - GSTREAMER_PBUTILS_INCLUDE_DIRS - GSTREAMER_PBUTILS_LIBRARIES - GSTREAMER_TAG_INCLUDE_DIRS - GSTREAMER_TAG_LIBRARIES - GSTREAMER_VIDEO_INCLUDE_DIRS - GSTREAMER_VIDEO_LIBRARIES - GSTREAMER_CODECPARSERS_INCLUDE_DIRS - GSTREAMER_CODECPARSERS_LIBRARIES -) diff --git a/gst_bridge/include/gst_bridge/rosbasesink.h b/gst_bridge/include/gst_bridge/rosbasesink.h index 16aac6c..e1b067b 100644 --- a/gst_bridge/include/gst_bridge/rosbasesink.h +++ b/gst_bridge/include/gst_bridge/rosbasesink.h @@ -43,8 +43,8 @@ struct _RosBaseSink GstBaseSink parent; gchar* node_name; gchar* node_namespace; + gint64 offset_time_ns; - rclcpp::Context::SharedPtr ros_context; rclcpp::Executor::SharedPtr ros_executor; rclcpp::Node::SharedPtr node; rclcpp::Logger logger; @@ -72,7 +72,7 @@ struct _RosBaseSinkClass /* - * destroy the ros publisher(s) and unregister your callbacks and timers and prepare for ros_context->shutdown() + * destroy the ros publisher(s) and unregister your callbacks and timers and prepare for the shutdown of the rclcpp context * called at gstbasesink->change_state() GST_STATE_CHANGE_READY_TO_NULL * timers and reconf callbacks are currently broken, needs a new thread with an executor, patches welcome */ diff --git a/gst_bridge/include/gst_bridge/rosbasesrc.h b/gst_bridge/include/gst_bridge/rosbasesrc.h index 1c651bf..de2b519 100644 --- a/gst_bridge/include/gst_bridge/rosbasesrc.h +++ b/gst_bridge/include/gst_bridge/rosbasesrc.h @@ -24,6 +24,7 @@ //include ROS and ROS message formats #include +#include G_BEGIN_DECLS @@ -44,7 +45,6 @@ struct _RosBaseSrc gchar* node_name; gchar* node_namespace; - rclcpp::Context::SharedPtr ros_context; rclcpp::Executor::SharedPtr ros_executor; rclcpp::Node::SharedPtr node; rclcpp::Logger logger; @@ -72,12 +72,14 @@ struct _RosBaseSrcClass /* - * destroy the ros subscription(s) and unregister your callbacks and timers and prepare for ros_context->shutdown() + * destroy the ros subscription(s) and unregister your callbacks and timers and prepare for shutdown of the rclcpp context * called at gstbasesrc->change_state() GST_STATE_CHANGE_READY_TO_NULL * timers and reconf callbacks are currently broken, needs a new thread with an executor, patches welcome */ gboolean (*close) (RosBaseSrc * src); + gboolean (*notify_thread) (RosBaseSrc * src); + }; GType rosbasesrc_get_type (void); diff --git a/gst_bridge/include/gst_bridge/rosimagesink.h b/gst_bridge/include/gst_bridge/rosimagesink.h index 4f7dca6..c73e956 100644 --- a/gst_bridge/include/gst_bridge/rosimagesink.h +++ b/gst_bridge/include/gst_bridge/rosimagesink.h @@ -26,6 +26,7 @@ //include ROS and ROS message formats #include +#include #include diff --git a/gst_bridge/include/gst_bridge/rosimagesrc.h b/gst_bridge/include/gst_bridge/rosimagesrc.h index 4c080ed..ace77ce 100644 --- a/gst_bridge/include/gst_bridge/rosimagesrc.h +++ b/gst_bridge/include/gst_bridge/rosimagesrc.h @@ -27,7 +27,7 @@ //include ROS and ROS message formats #include #include -#include // std::queue +#include #include // std::mutex, std::unique_lock #include // std::condition_variable @@ -56,7 +56,7 @@ struct _Rosimagesrc // XXX this is too much boilerplate. size_t msg_queue_max; - std::queue msg_queue; + std::deque msg_queue; std::mutex msg_queue_mtx; std::condition_variable msg_queue_cv; diff --git a/gst_bridge/package.xml b/gst_bridge/package.xml index add9f94..4d23198 100644 --- a/gst_bridge/package.xml +++ b/gst_bridge/package.xml @@ -14,6 +14,8 @@ libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev + liblttng-ust-dev + liblttng-ctl-dev std_msgs audio_msgs sensor_msgs diff --git a/gst_bridge/src/gst_bridge_tpp.cpp b/gst_bridge/src/gst_bridge_tpp.cpp new file mode 100644 index 0000000..25108a5 --- /dev/null +++ b/gst_bridge/src/gst_bridge_tpp.cpp @@ -0,0 +1,2 @@ +#define LTTNG_UST_TRACEPOINT_CREATE_PROBES +#include "gst_bridge_tpp.h" diff --git a/gst_bridge/src/gst_bridge_tpp.h b/gst_bridge/src/gst_bridge_tpp.h new file mode 100644 index 0000000..af79dbc --- /dev/null +++ b/gst_bridge/src/gst_bridge_tpp.h @@ -0,0 +1,41 @@ +#undef LTTNG_UST_TRACEPOINT_PROVIDER +#define LTTNG_UST_TRACEPOINT_PROVIDER gst_bridge + +#undef LTTNG_UST_TRACEPOINT_INCLUDE +#define LTTNG_UST_TRACEPOINT_INCLUDE "./gst_bridge_tpp.h" + +#if !defined(_TP_H) || defined(LTTNG_UST_TRACEPOINT_HEADER_MULTI_READ) +#define _TP_H + +#include + +TRACEPOINT_EVENT( + LTTNG_UST_TRACEPOINT_PROVIDER, + gst_sink_render, + TP_ARGS( + const void *, ros_base_sink_handle_arg, + int64_t, source_timestamp_arg + ), + TP_FIELDS( + ctf_integer_hex(const void *, ros_base_sink_handle, ros_base_sink_handle_arg) + ctf_integer(int64_t, source_timestamp, source_timestamp_arg) + ) +) + +TRACEPOINT_EVENT( + LTTNG_UST_TRACEPOINT_PROVIDER, + gst_sink_open, + TP_ARGS( + const void *, node_handle_arg, + const void *, ros_base_sink_handle_arg + ), + TP_FIELDS( + ctf_integer_hex(const void *, node_handle, node_handle_arg) + ctf_integer_hex(const void *, ros_base_sink_handle, ros_base_sink_handle_arg) + ) +) + +#endif /* _TP_H */ + +#include + diff --git a/gst_bridge/src/rosaudiosrc.cpp b/gst_bridge/src/rosaudiosrc.cpp index 095c5ba..663fddf 100644 --- a/gst_bridge/src/rosaudiosrc.cpp +++ b/gst_bridge/src/rosaudiosrc.cpp @@ -556,7 +556,7 @@ static void rosaudiosrc_sub_cb(Rosaudiosrc * src, audio_msgs::msg::Audio::ConstS GST_AUDIO_INFO_RATE(&(src->audio_info)), msg->sample_rate); if(gst_bridge::getRosEncoding(GST_AUDIO_INFO_FORMAT(&(src->audio_info))) != msg->encoding.c_str() ) RCLCPP_ERROR(ros_base_src->logger, "audio format changed during playback, encoding %s != %s", - gst_bridge::getRosEncoding(GST_AUDIO_INFO_FORMAT(&(src->audio_info))), msg->encoding.c_str()); // XXX account for the override + gst_bridge::getRosEncoding(GST_AUDIO_INFO_FORMAT(&(src->audio_info))).c_str(), msg->encoding.c_str()); // XXX account for the override if(GST_AUDIO_INFO_ENDIANNESS(&(src->audio_info)) != (msg->is_bigendian ? G_BIG_ENDIAN : G_LITTLE_ENDIAN)) RCLCPP_ERROR(ros_base_src->logger, "audio format changed during playback, endianness %d != %d", GST_AUDIO_INFO_ENDIANNESS(&(src->audio_info)), (msg->is_bigendian ? G_BIG_ENDIAN : G_LITTLE_ENDIAN)); diff --git a/gst_bridge/src/rosbasesink.cpp b/gst_bridge/src/rosbasesink.cpp index f726e0a..3e03af5 100644 --- a/gst_bridge/src/rosbasesink.cpp +++ b/gst_bridge/src/rosbasesink.cpp @@ -33,6 +33,8 @@ #include +#define LTTNG_UST_TRACEPOINT_DEFINE +#include "gst_bridge_tpp.h" GST_DEBUG_CATEGORY_STATIC (rosbasesink_debug_category); #define GST_CAT_DEFAULT rosbasesink_debug_category @@ -64,6 +66,7 @@ enum PROP_ROS_NAME, PROP_ROS_NAMESPACE, PROP_ROS_START_TIME, + PROP_ROS_TIME_OFFSET, }; @@ -107,6 +110,12 @@ static void rosbasesink_class_init (RosBaseSinkClass * klass) (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)) ); + g_object_class_install_property (object_class, PROP_ROS_TIME_OFFSET, + g_param_spec_int64 ("ros-time-offset", "ros-time-offset", "ROS time offset (nanoseconds)", + G_MININT64, G_MAXINT64, GST_CLOCK_TIME_NONE, + (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)) + ); + element_class->change_state = GST_DEBUG_FUNCPTR (rosbasesink_change_state); //use state change events to open and close publishers basesink_class->render = GST_DEBUG_FUNCPTR (rosbasesink_render); // gives us a buffer to forward @@ -117,6 +126,7 @@ static void rosbasesink_init (RosBaseSink * sink) sink->node_name = g_strdup("gst_base_sink_node"); sink->node_namespace = g_strdup(""); sink->stream_start_prop = GST_CLOCK_TIME_NONE; + sink->offset_time_ns = 0; } void rosbasesink_set_property (GObject * object, guint property_id, @@ -162,6 +172,17 @@ void rosbasesink_set_property (GObject * object, guint property_id, } break; + case PROP_ROS_TIME_OFFSET: + if(sink->node) + { + RCLCPP_ERROR(sink->logger, "can't change time_offset once opened"); + } + else + { + sink->offset_time_ns = g_value_get_int64(value); + } + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -188,6 +209,10 @@ void rosbasesink_get_property (GObject * object, guint property_id, // XXX this allows inspection via props, // but may cause confusion because it does not show the actual prop break; + + case PROP_ROS_TIME_OFFSET: + g_value_set_int64(value, sink->offset_time_ns); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); @@ -260,16 +285,18 @@ static gboolean rosbasesink_open (RosBaseSink * sink) gboolean result = TRUE; GST_DEBUG_OBJECT (sink, "open"); - sink->ros_context = std::make_shared(); - sink->ros_context->init(0, NULL); // XXX should expose the init arg list - auto opts = rclcpp::NodeOptions(); - opts.context(sink->ros_context); //set a context to generate the node in - sink->node = std::make_shared(std::string(sink->node_name), std::string(sink->node_namespace), opts); - - auto ex_args = rclcpp::ExecutorOptions(); - ex_args.context = sink->ros_context; - sink->ros_executor = std::make_shared(ex_args); - sink->ros_executor->add_node(sink->node); + try { + rclcpp::init(0, NULL, rclcpp::InitOptions(), rclcpp::SignalHandlerOptions::None); + sink->node = std::make_shared(std::string(sink->node_name), std::string(sink->node_namespace)); + sink->ros_executor = std::make_shared(); + sink->ros_executor->add_node(sink->node); + lttng_ust_tracepoint(gst_bridge, gst_sink_open, static_cast(sink->node->get_node_base_interface()->get_rcl_node_handle()), static_cast(sink)); + } + catch (const std::exception &e) + { + RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "failed to create node: %s", e.what()); + return FALSE; + } // allow sub-class to create publishers on sink->node if(sink_class->open) @@ -292,19 +319,18 @@ static gboolean rosbasesink_close (RosBaseSink * sink) GST_DEBUG_OBJECT (sink, "close"); - sink->clock.reset(); - //allow sub-class to clean up before destroying ros context if(sink_class->close) result = sink_class->close(sink); - // XXX do something with result - //XXX executor sink->ros_executor->cancel(); sink->spin_thread.join(); + sink->clock.reset(); sink->node.reset(); - sink->ros_context->shutdown("gst closing rosbasesink"); + sink->ros_executor.reset(); + + rclcpp::shutdown(); return result; } @@ -328,6 +354,8 @@ static GstFlowReturn rosbasesink_render (GstBaseSink * base_sink, GstBuffer * bu base_time = gst_element_get_base_time(GST_ELEMENT(sink)); msg_time = rclcpp::Time(GST_BUFFER_PTS(buf) + base_time + sink->ros_clock_offset, sink->clock->get_clock_type()); + lttng_ust_tracepoint(gst_bridge, gst_sink_render, static_cast(sink_class), msg_time.nanoseconds()); + if(NULL != sink_class->render) return sink_class->render(sink, buf, msg_time); diff --git a/gst_bridge/src/rosbasesrc.cpp b/gst_bridge/src/rosbasesrc.cpp index 9ad3e26..254df1e 100644 --- a/gst_bridge/src/rosbasesrc.cpp +++ b/gst_bridge/src/rosbasesrc.cpp @@ -51,7 +51,7 @@ static void rosbasesrc_init (RosBaseSrc * src); static gboolean rosbasesrc_open (RosBaseSrc * src); static gboolean rosbasesrc_close (RosBaseSrc * src); static void spin_wrapper(RosBaseSrc * src); - +static gboolean rosbasesrc_notify_thread (RosBaseSrc * src); /* XXX provide a mechanism for ROS to provide a clock @@ -64,6 +64,7 @@ enum PROP_ROS_NAME, PROP_ROS_NAMESPACE, PROP_ROS_START_TIME, + PROP_ROS_TIME_OFFSET, }; /* class initialization */ @@ -231,8 +232,10 @@ static GstStateChangeReturn rosbasesrc_change_state (GstElement * element, GstSt break; } case GST_STATE_CHANGE_READY_TO_PAUSED: - //XXX stop the subscription + //XXX stop the subscription case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + rosbasesrc_notify_thread(src); + break; case GST_STATE_CHANGE_PAUSED_TO_READY: default: break; @@ -267,16 +270,17 @@ static gboolean rosbasesrc_open (RosBaseSrc * src) GST_DEBUG_OBJECT (src, "open"); - src->ros_context = std::make_shared(); - src->ros_context->init(0, NULL); // XXX should expose the init arg list - auto opts = rclcpp::NodeOptions(); - opts.context(src->ros_context); //set a context to generate the node in - src->node = std::make_shared(std::string(src->node_name), std::string(src->node_namespace), opts); - - auto ex_args = rclcpp::ExecutorOptions(); - ex_args.context = src->ros_context; - src->ros_executor = std::make_shared(ex_args); - src->ros_executor->add_node(src->node); + try { + rclcpp::init(0, NULL, rclcpp::InitOptions(), rclcpp::SignalHandlerOptions::None); + src->node = std::make_shared(std::string(src->node_name), std::string(src->node_namespace)); + src->ros_executor = std::make_shared(); + src->ros_executor->add_node(src->node); + } + catch (const std::exception &e) + { + RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "failed to create node: %s", e.what()); + return FALSE; + } // allow sub-class to create subscribers on src->node if(src_class->open) @@ -290,6 +294,19 @@ static gboolean rosbasesrc_open (RosBaseSrc * src) return result; } +static gboolean rosbasesrc_notify_thread (RosBaseSrc * src) +{ + RosBaseSrcClass *src_class = GST_ROS_BASE_SRC_GET_CLASS (src); + using std::placeholders::_1; + + GST_DEBUG_OBJECT (src, "notify_thread"); + + if(src_class->notify_thread) + src_class->notify_thread(src); + + return TRUE; +} + /* close the device */ static gboolean rosbasesrc_close (RosBaseSrc * src) { @@ -298,8 +315,6 @@ static gboolean rosbasesrc_close (RosBaseSrc * src) GST_DEBUG_OBJECT (src, "close"); gboolean result = TRUE; - src->clock.reset(); - //allow sub-class to clean up before destroying ros context if(src_class->close) result = src_class->close(src); @@ -307,13 +322,11 @@ static gboolean rosbasesrc_close (RosBaseSrc * src) //stop the executor src->ros_executor->cancel(); src->spin_thread.join(); - src->ros_context->shutdown("gst closing rosbasesrc"); - - //release anything held by shared pointer - src->ros_context.reset(); - src->ros_executor.reset(); - src->node.reset(); src->clock.reset(); + src->node.reset(); + src->ros_executor.reset(); + + rclcpp::shutdown(); return result; } diff --git a/gst_bridge/src/rosimagesink.cpp b/gst_bridge/src/rosimagesink.cpp index 948b6c5..f45d900 100644 --- a/gst_bridge/src/rosimagesink.cpp +++ b/gst_bridge/src/rosimagesink.cpp @@ -310,6 +310,37 @@ static GstFlowReturn rosimagesink_render (RosBaseSink * ros_base_sink, GstBuffer Rosimagesink *sink = GST_ROSIMAGESINK (ros_base_sink); GST_DEBUG_OBJECT (sink, "render"); + int32_t sec = 0; + uint32_t nsec = 0; + + // Make duration an absolute value due to duration object definition + if (abs(ros_base_sink->offset_time_ns) > G_MAXUINT32) + { + //need to to convert value to a integer value of seconds while leaving the rest in nanosecs to maintain precision + //as nsec of Duration is an uint32 object. + RCLCPP_DEBUG(ros_base_sink->logger, "time offset is greater than 32 bits"); + sec = static_cast(abs(ros_base_sink->offset_time_ns) / G_GINT64_CONSTANT(1000000000)); + nsec = static_cast(abs(ros_base_sink->offset_time_ns) - (sec * G_GINT64_CONSTANT(1000000000))); + } + else + { + sec = 0; + nsec = static_cast(abs(ros_base_sink->offset_time_ns)); + } + + rclcpp::Duration offset_time(sec, nsec); + + if (ros_base_sink->offset_time_ns < 0) + { + RCLCPP_DEBUG(ros_base_sink->logger, "time offset is negative %f seconds or %ld nanosecs", offset_time.seconds(), offset_time.nanoseconds()); + msg_time = msg_time - offset_time; + } + else + { + RCLCPP_DEBUG(ros_base_sink->logger, "time offset is positive %f seconds or %ld nanosecs", offset_time.seconds(), offset_time.nanoseconds()); + msg_time = msg_time + offset_time; + } + msg.header.stamp = msg_time; msg.header.frame_id = sink->frame_id; diff --git a/gst_bridge/src/rosimagesrc.cpp b/gst_bridge/src/rosimagesrc.cpp index 675075a..dbe9bd0 100644 --- a/gst_bridge/src/rosimagesrc.cpp +++ b/gst_bridge/src/rosimagesrc.cpp @@ -47,6 +47,7 @@ static void rosimagesrc_init (Rosimagesrc * src); static gboolean rosimagesrc_open (RosBaseSrc * ros_base_src); static gboolean rosimagesrc_close (RosBaseSrc * ros_base_src); static GstFlowReturn rosimagesrc_create (GstBaseSrc * base_src, guint64 offset, guint size, GstBuffer **buf); +static gboolean rosimagesrc_notify_thread (RosBaseSrc * ros_base_src); //static gboolean rosimagesrc_negotiate (GstBaseSrc * base_src); //static GstCaps* rosimagesrc_setcaps (GstBaseSrc * base_src, GstCaps * caps); //upstream returns any remaining caps preferences @@ -138,6 +139,7 @@ static void rosimagesrc_class_init (RosimagesrcClass * klass) ros_base_src_class->open = GST_DEBUG_FUNCPTR (rosimagesrc_open); //let the base sink know how we register publishers ros_base_src_class->close = GST_DEBUG_FUNCPTR (rosimagesrc_close); //let the base sink know how we destroy publishers + ros_base_src_class->notify_thread = GST_DEBUG_FUNCPTR (rosimagesrc_notify_thread); basesrc_class->create = GST_DEBUG_FUNCPTR(rosimagesrc_create); @@ -161,7 +163,7 @@ static void rosimagesrc_init (Rosimagesrc * src) src->msg_init = true; src->msg_queue_max = 1; // XXX why does queue segfault without expicit construction? - src->msg_queue = std::queue(); + src->msg_queue = std::deque(); /* configure basesrc to be a live source */ gst_base_src_set_live (GST_BASE_SRC (src), TRUE); @@ -341,12 +343,22 @@ static gboolean rosimagesrc_close (RosBaseSrc * ros_base_src) //XXX dereference is as close as foxy gets to unsubscribe src->sub.reset(); + //empty the queue std::unique_lock lck(src->msg_queue_mtx); - while(src->msg_queue.size() > 0) - { - src->msg_queue.pop(); - } + src->msg_queue.clear(); + + return TRUE; +} + +static gboolean rosimagesrc_notify_thread (RosBaseSrc * ros_base_src) +{ + Rosimagesrc *src = GST_ROSIMAGESRC (ros_base_src); + + GST_DEBUG_OBJECT (src, "notify_thread"); + + // notify any waiting threads + src->msg_queue_cv.notify_all(); return TRUE; } @@ -414,10 +426,17 @@ static GstCaps* rosimagesrc_getcaps (GstBaseSrc * base_src, GstCaps * filter) GST_DEBUG_OBJECT (src, "getcaps with node not ready, returning template"); return gst_pad_get_pad_template_caps (GST_BASE_SRC (src)->srcpad); } + GST_DEBUG_OBJECT (src, "getcaps with node ready, waiting for message"); RCLCPP_INFO(ros_base_src->logger, "waiting for first message"); msg = rosimagesrc_wait_for_msg(src); // XXX need to fix API, the action happens in a side-effect + // if(src->msg_init) + // { + // GST_DEBUG_OBJECT (src, "getcaps with message not rx'd, returning template"); + // return gst_pad_get_pad_template_caps (GST_BASE_SRC (src)->srcpad); + // } + format_enum = gst_bridge::getGstVideoFormat(std::string(src->encoding)); format_str = gst_video_format_to_string(format_enum); @@ -497,13 +516,17 @@ static GstFlowReturn rosimagesrc_create (GstBaseSrc * base_src, guint64 offset, } auto msg = rosimagesrc_wait_for_msg(src); - { //scope the mutex lock - std::unique_lock lck(src->msg_queue_mtx); - src->msg_queue.pop(); // XXX we can stop dropping the first message during preroll now + if (!msg) + { + GST_DEBUG_OBJECT (src, "no message to create buffer from"); + return GST_FLOW_ERROR; + } else { + { //scope the mutex lock + std::unique_lock lck(src->msg_queue_mtx); + src->msg_queue.clear(); // XXX we can stop dropping the first message during preroll now + } } - // XXX check message contains anything - length = msg->data.size(); if (*buf == NULL) { /* downstream did not provide us with a buffer to fill, allocate one @@ -549,7 +572,7 @@ static void rosimagesrc_sub_cb(Rosimagesrc * src, sensor_msgs::msg::Image::Const else { if(!(src->step == msg->step / msg->width)) - RCLCPP_ERROR(ros_base_src->logger, "image format changed during playback, step %d != %d", src->step, msg->step/msg->width); + RCLCPP_ERROR(ros_base_src->logger, "image format changed during playback, step %ld != %d", src->step, msg->step/msg->width); if(!(src->height == (int) msg->height)) RCLCPP_ERROR(ros_base_src->logger, "image format changed during playback, height %d != %d", src->height, msg->height); if(!(src->width == (int) msg->width)) @@ -562,10 +585,10 @@ static void rosimagesrc_sub_cb(Rosimagesrc * src, sensor_msgs::msg::Image::Const } std::unique_lock lck(src->msg_queue_mtx); - src->msg_queue.push(msg); + src->msg_queue.push_front(msg); while(src->msg_queue.size() > src->msg_queue_max) { - src->msg_queue.pop(); + src->msg_queue.pop_front(); RCLCPP_WARN(ros_base_src->logger, "dropping message"); } src->msg_queue_cv.notify_one(); @@ -577,11 +600,17 @@ static sensor_msgs::msg::Image::ConstSharedPtr rosimagesrc_wait_for_msg(Rosimage //RosBaseSrc *ros_base_src = GST_ROS_BASE_SRC (src); std::unique_lock lck(src->msg_queue_mtx); - while(src->msg_queue.empty()) + // while(src->msg_queue.empty()) + // { + src->msg_queue_cv.wait(lck); + // } + + if (src->msg_queue.empty()) { - src->msg_queue_cv.wait(lck); + // the wait was interrupted + return sensor_msgs::msg::Image::ConstSharedPtr(); } - auto msg = src->msg_queue.front(); + auto msg = src->msg_queue.front(); return msg; } diff --git a/gst_pipeline/setup.cfg b/gst_pipeline/setup.cfg index 888bc99..bf4252d 100644 --- a/gst_pipeline/setup.cfg +++ b/gst_pipeline/setup.cfg @@ -1,4 +1,4 @@ [develop] -script-dir=$base/lib/gst_pipeline +script_dir=$base/lib/gst_pipeline [install] -install-scripts=$base/lib/gst_pipeline +install_scripts=$base/lib/gst_pipeline