Skip to content

Question: low inference performance with tbb::flow::graph — possible data-affinity issue #1911

@xxf1ow

Description

@xxf1ow

Hi — I'm seeing lower-than-expected throughput/latency for the inference stage when using tbb::flow::graph, and I would like advice from maintainers/experts.

Background / observed behavior

  • I have a video-stream model pipeline where inference time is much larger than preprocessing and postprocessing.
  • I run inference with the OpenVINO backend on an Intel CPU — all data, including model instances, live on the CPU.
  • I suspect the performance issue is caused by data/memory affinity because model instances (from a pool) are effectively being used across different threads, i.e. model instance index is assigned at preprocess time and then the inference node executes on a worker thread that may be different from the one that handled the model earlier.

Questions

  1. For a video-stream model inference scenario like mine, is tbb::flow::graph an appropriate choice?

  2. If inference performance is below expectations, what are the likely causes and remedies? Specifically:

    • Is this likely a problem in my implementation?
    • Is it likely a data-affinity (cache / NUMA / thread-locality) issue due to how model instances are assigned and used across threads?
    • Or could there be other causes (TBB scheduling, contention, model internals, memory allocation, etc.)?
      Any suggestions on how to diagnose and fix this would be appreciated.

My implementation

template <typename ModelHandlerType>
class PipelineNodeTbb
{
public:
    using ItemType = typename ModelHandlerType::ItemType;
    using ModelType = typename ModelHandlerType::ModelType;
    using InputType = std::optional<typename ModelHandlerType::InputType>;
    using OutputType = typename ModelHandlerType::OutputType;
    using ModelParams = const std::shared_ptr<typename ModelType::Params> &;
    using DataPacket = std::tuple<std::shared_ptr<ItemType>, uint64_t>;
    using InNode = tbb::flow::multifunction_node<InputType, std::tuple<DataPacket>>;
    using ExNode = tbb::flow::function_node<DataPacket, DataPacket>;
    using OuNode = tbb::flow::multifunction_node<DataPacket, std::tuple<OutputType>>;

    tbb::flow::limiter_node<InputType> &get_input() { return *_limiter; } // Expose node for external connection

    OuNode &get_output() { return tbb::flow::output_port<0>(*_postprocess); } // Expose node for external connection

    void start() { _stop_requested.store(false, std::memory_order_release); }

    void stop()
    {
        _stop_requested.store(true, std::memory_order_release);
        _preprocess->try_put(std::nullopt);
    }


    explicit PipelineNodeTbb(tbb::flow::graph &g, ModelParams model_cfg,
                             std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> fun,
                             int num_thread = 1, int token_capacity = 64)
        : _batch_size(std::max<int>(1, model_cfg->batch_size_)), _unpack(std::move(fun))
    {
        if (!_unpack)
            throw std::runtime_error("Invalid unpack function ...");
        num_thread = std::min(std::max<uint32_t>(1, num_thread), std::thread::hardware_concurrency()); // NOLINT
        token_capacity = std::max(token_capacity, 1);
        // Initialize model pool
        _models.reserve(num_thread);
        for (int i = 0; i < num_thread; ++i)
        {
            auto model = std::make_unique<ModelType>(model_cfg);
            if (!model || !model->initialize() || !model->is_loaded())
                throw std::runtime_error("Failed to create model instance.");
            _models.emplace_back(std::move(model));
        }
        // preprocess
        _preprocess = std::make_unique<InNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
            std::shared_ptr<ItemType> item = nullptr;
            if (_stop_requested.load(std::memory_order_acquire))
            {
                if (_batch_collector) // Directly push incomplete batch to avoid dropping frames
                    item = std::move(_batch_collector);
            }
            else
            {
                if (!inp_.has_value())
                    return;
                if (!_batch_collector)
                    _batch_collector = std::make_shared<ItemType>();
                ModelHandlerType::collect(_batch_collector, *inp_);
                if (ModelHandlerType::get_batch_count(_batch_collector) == _batch_size)
                    item = std::move(_batch_collector);
            }
            if (!item)
                return;
            _batch_collector.reset();
            const uint64_t index = _model_idx.fetch_add(1, std::memory_order_relaxed) % _models.size();
            item->success = ModelHandlerType::preprocess(*_models[index], item);
            std::get<0>(outp_).try_put(std::make_tuple(std::move(item), index));
        });
        // inference
        _inference = std::make_unique<ExNode>(g, num_thread, [this](auto &&inp_) { // NOLINT
            const auto [item, index] = inp_;
            if (item && item->success)
                item->success = ModelHandlerType::inference(*_models[index], item); // Only the inference step needs to be bound to a model instance
            return inp_;
        });
        // postprocess
        _postprocess = std::make_unique<OuNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
            const auto [item, index] = inp_;
            if (item && item->success)
                item->success = ModelHandlerType::postprocess(*_models[index], item);
            _unpack(item, outp_);
            _limiter->decrementer().try_put(tbb::flow::continue_msg()); // Notify limiter_node to release one token
        });
        // make_edge
        _limiter = std::make_unique<tbb::flow::limiter_node<InputType>>(g, token_capacity);
        tbb::flow::make_edge(*_limiter, *_preprocess);
        tbb::flow::make_edge(tbb::flow::output_port<0>(*_preprocess), *_inference);
        tbb::flow::make_edge(*_inference, *_postprocess);
    }

private:
    std::atomic<bool> _stop_requested = false;
    int _batch_size = 1;
    std::shared_ptr<ItemType> _batch_collector = nullptr;
    std::unique_ptr<tbb::flow::limiter_node<InputType>> _limiter = nullptr;
    std::unique_ptr<InNode> _preprocess = nullptr;
    std::unique_ptr<ExNode> _inference = nullptr;
    std::unique_ptr<OuNode> _postprocess = nullptr;
    std::atomic<uint64_t> _model_idx = 0;            // avoid data races when assigning model index
    std::vector<std::unique_ptr<ModelType>> _models; // model pool (length == parallelism)
    std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> _unpack = nullptr;
};

Thanks in advance for any guidance

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions