Giter Site home page Giter Site logo

Comments (4)

elBoberido avatar elBoberido commented on May 25, 2024 1

@sgf201 this is not an issue with iceoryx itself but with the framework that uses iceoryx and how it is integrated. In your case either the ROS 2 RMW or Cyclone DDS needs to adapt the way they integrated iceoryx. RouDi is and cannot handle allocations. This is the task of the publisher and the subscriber will release the allocated chunks. The memory chunks are also not automatically released but the user can control the lifetime and just needs to call iox_sub_release_chunk(iox_sub_t const self, const void* const userPayload) after the memory isn't in use anymore. If the framework you are using is doing this while you are still holding a pointer to the memory, then that framework needs to rethink its API or the way it provides you with the memory. Unfortunately there is not much we can do on the iceoryx side.

from iceoryx.

elBoberido avatar elBoberido commented on May 25, 2024

@sgf201 I move the code in the description to markdown code blocks to make it more readable, i.e.

```cpp
my_code
```

It seems something is missing or only partly added. Could you emend the example code.

I'm also not quite sure what you mean by subscriber alloc sample from roudi. Can you elaborate?

from iceoryx.

elBoberido avatar elBoberido commented on May 25, 2024

@sgf201 I skimmed over the issue you linked and I think this is not an iceoryx issue but an issue on how iceoryx is used in the rwm implementation or in the Cyclone DDS integration. Like @clalancette already mentioned, one just needs to add a custom deleter to the shared_ptr to abide the contract of the shared_ptr taking care of the lifetime.

from iceoryx.

sgf201 avatar sgf201 commented on May 25, 2024

@elBoberido
In the current implementation, assuming that the subscriber support zero copy, the messages it receives may be native serialization or may not be sent using zero copy due to restrictions. However, the current design requires that when the subscriber supports zero copy, it should use the same API to process the received messages. In the RMW layer, the received data is deserialized into a block of memory requested from the heap, which is released differently from the memory obtained from roudi

Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
{
  rclcpp::MessageInfo message_info;
  message_info.get_rmw_message_info().from_intra_process = false;

  if (subscription->is_serialized()) {
    // This is the case where a copy of the serialized message is taken from
    // the middleware via inter-process communication.
    std::shared_ptr<SerializedMessage> serialized_msg = subscription->create_serialized_message();
    take_and_do_error_handling(
      "taking a serialized message from topic",
      subscription->get_topic_name(),
      [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
      [&]()
      {
        subscription->handle_serialized_message(serialized_msg, message_info);
      });
    subscription->return_serialized_message(serialized_msg);
  } else if (subscription->can_loan_messages()) {
    // This is the case where a loaned message is taken from the middleware via
    // inter-process communication, given to the user for their callback,
    // and then returned.
    void * loaned_msg = nullptr;
    // TODO(wjwwood): refactor this into methods on subscription when LoanedMessage
    //   is extened to support subscriptions as well.
    take_and_do_error_handling(
      "taking a loaned message from topic",
      subscription->get_topic_name(),
      [&]()
      {
        rcl_ret_t ret = rcl_take_loaned_message(
          subscription->get_subscription_handle().get(),
          &loaned_msg,
          &message_info.get_rmw_message_info(),
          nullptr);
        if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
          return false;
        } else if (RCL_RET_OK != ret) {
          rclcpp::exceptions::throw_from_rcl_error(ret);
        }
        return true;
      },
      [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
    if (nullptr != loaned_msg) {
      rcl_ret_t ret = rcl_return_loaned_message_from_subscription(
        subscription->get_subscription_handle().get(),
        loaned_msg);
      if (RCL_RET_OK != ret) {
        RCLCPP_ERROR(
          rclcpp::get_logger("rclcpp"),
          "rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s",
          subscription->get_topic_name(), rcl_get_error_string().str);
      }
      loaned_msg = nullptr;
    }
  } else {
    // This case is taking a copy of the message data from the middleware via
    // inter-process communication.
    std::shared_ptr<void> message = subscription->create_message();
    take_and_do_error_handling(
      "taking a message from topic",
      subscription->get_topic_name(),
      [&]() {return subscription->take_type_erased(message.get(), message_info);},
      [&]() {subscription->handle_message(message, message_info);});
    subscription->return_message(message);
  }
}

the custom deleter attached to the shared_ptr needs data_allocator to return loaned sample to roudi,but the lifecycle of data_allocator is tightly coupled with the taking of messages, in the current implementation, data_allocator should be released after each callback and reinitialized when the next message is taken. The reason for this is that subscribers cannot apply for samples from roudi, when the interface requires subscribers to apply for space to store data, they can only apply from the heap. When retrieving messages, they need to reinitialize the data_allocator based on the actual message type
Here is the implementation of ROS2 releasing the sample

static rmw_ret_t return_loaned_message_from_subscription_int(
  const rmw_subscription_t * subscription,
  void * loaned_message)
{
  // if the subscription allow loaning
  if (cdds_subscription->is_loaning_available) {
    return fini_and_free_sample(cdds_subscription, loaned_message);
......
  
static rmw_ret_t fini_and_free_sample(entityT & entity, void * loaned_message)
{
  // fini the message
  rmw_cyclonedds_cpp::fini_message(&entity->type_supports, loaned_message);
  // free the message memory
  RET_EXPECTED(
    **dds_data_allocator_free**(
      &entity->data_allocator,
      loaned_message),
    DDS_RETCODE_OK,
    "Failed to free the loaned message",
    return RMW_RET_ERROR);
  // fini the allocator
  RET_EXPECTED(
    **dds_data_allocator_fini**(&entity->data_allocator),
    DDS_RETCODE_OK,
    "Failed to fini data allocator",
    return RMW_RET_ERROR);
  return RMW_RET_OK;
}
......
dds_return_t dds_data_allocator_free (dds_data_allocator_t *data_allocator, void *ptr)
{
  dds_return_t ret = DDS_RETCODE_OK;

  if (data_allocator == NULL)
    return DDS_RETCODE_BAD_PARAMETER;

  if(**data_allocator->entity == DDS_DATA_ALLOCATOR_ALLOC_ON_HEAP**) {
    ddsrt_free(ptr);
  } else {
    dds_iox_allocator_t *d = (dds_iox_allocator_t *)data_allocator->opaque.bytes;
    switch (d->kind) {
      case DDS_IOX_ALLOCATOR_KIND_FINI:
        ret = DDS_RETCODE_PRECONDITION_NOT_MET;
        break;
      case DDS_IOX_ALLOCATOR_KIND_NONE:
        ddsrt_free(ptr);
        break;
      case DDS_IOX_ALLOCATOR_KIND_SUBSCRIBER:
        **if (ptr != NULL) {
          ddsrt_mutex_lock(&d->mutex);
          shm_lock_iox_sub(d->ref.sub);
          iox_sub_release_chunk(d->ref.sub, ptr);
          shm_unlock_iox_sub(d->ref.sub);
          ddsrt_mutex_unlock(&d->mutex);
        }**
        break;

if subscriber can alloc sample from roudi,the data_allocator will not change when taking new message,and adding a custom deleter to the shared_ptr is possible

from iceoryx.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.