Skip to content

Commit 200080d

Browse files
committed
RedisConnection: enhance WriteQueueItem & related usages
1 parent e7951c4 commit 200080d

File tree

2 files changed

+129
-89
lines changed

2 files changed

+129
-89
lines changed

lib/icingadb/redisconnection.cpp

Lines changed: 115 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ void RedisConnection::FireAndForgetQuery(Query query, QueryAffects affects, bool
171171
auto item (Shared<Query>::Make(std::move(query)));
172172

173173
asio::post(m_Strand, [this, item, highPriority, affects, ctime = std::chrono::steady_clock::now()]() {
174-
auto qitem = WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects, highPriority};
174+
auto qitem = WriteQueueItem{item, ctime, affects, highPriority};
175175
if (highPriority) {
176176
m_Queues.PushFront(std::move(qitem));
177177
} else {
@@ -201,7 +201,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Que
201201
auto item (Shared<Queries>::Make(std::move(queries)));
202202

203203
asio::post(m_Strand, [this, item, affects, ctime = std::chrono::steady_clock::now()]() {
204-
m_Queues.Writes.push_back(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects});
204+
m_Queues.Writes.push_back(WriteQueueItem{item, ctime, affects});
205205
m_QueuedWrites.Set();
206206
IncreasePendingQueries(item->size());
207207
});
@@ -228,7 +228,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
228228
auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
229229

230230
asio::post(m_Strand, [this, item, affects, ctime = std::chrono::steady_clock::now()]() {
231-
m_Queues.Writes.push_back(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects});
231+
m_Queues.Writes.push_back(WriteQueueItem{item, ctime, affects});
232232
m_QueuedWrites.Set();
233233
IncreasePendingQueries(1);
234234
});
@@ -261,7 +261,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(Queries queries, Q
261261
auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
262262

263263
asio::post(m_Strand, [this, item, highPriority, affects, ctime = std::chrono::steady_clock::now()]() {
264-
auto qitem = WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects, highPriority};
264+
auto qitem = WriteQueueItem{item, ctime, affects, highPriority};
265265
if (highPriority) {
266266
m_Queues.PushFront(std::move(qitem));
267267
} else {
@@ -281,7 +281,7 @@ void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yiel
281281
AssertNotStopped();
282282

283283
asio::post(m_Strand, [this, callback, ctime = std::chrono::steady_clock::now()]() {
284-
m_Queues.Writes.push_back(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime});
284+
m_Queues.Writes.push_back(WriteQueueItem{callback, ctime});
285285
m_QueuedWrites.Set();
286286
});
287287
}
@@ -516,7 +516,14 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
516516
auto queuedWrite(std::move(m_Queues.Writes.front()));
517517
m_Queues.Writes.pop_front();
518518

519-
WriteItem(yc, std::move(queuedWrite));
519+
std::visit(
520+
[this, &yc, &queuedWrite](const auto& item) {
521+
if (WriteItem(item, yc)) {
522+
RecordAffected(queuedWrite.Affects, Utility::GetTime());
523+
}
524+
},
525+
queuedWrite.Item
526+
);
520527
}
521528

522529
m_QueuedWrites.Clear();
@@ -559,111 +566,138 @@ void RedisConnection::LogStats(asio::yield_context& yc)
559566
}
560567

561568
/**
562-
* Send next and schedule receiving the response
569+
* Write a single Redis query in a fire-and-forget manner.
570+
*
571+
* @param item Redis query
563572
*
564-
* @param next Redis queries
573+
* @return true on success, false on failure.
565574
*/
566-
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
575+
bool RedisConnection::WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc)
567576
{
568-
if (next.FireAndForgetQuery) {
569-
auto& item (*next.FireAndForgetQuery);
570-
DecreasePendingQueries(1);
577+
DecreasePendingQueries(1);
571578

572-
try {
573-
WriteOne(item, yc);
574-
} catch (const std::exception& ex) {
575-
Log msg (LogCritical, "IcingaDB", "Error during sending query");
576-
LogQuery(item, msg);
577-
msg << " which has been fired and forgotten: " << ex.what();
579+
try {
580+
WriteOne(*item, yc);
581+
} catch (const std::exception& ex) {
582+
Log msg (LogCritical, "IcingaDB", "Error during sending query");
583+
LogQuery(*item, msg);
584+
msg << " which has been fired and forgotten: " << ex.what();
578585

579-
return;
580-
}
581-
582-
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
583-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
584-
} else {
585-
++m_Queues.FutureResponseActions.back().Amount;
586-
}
586+
return false;
587+
}
587588

588-
m_QueuedReads.Set();
589+
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
590+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
591+
} else {
592+
++m_Queues.FutureResponseActions.back().Amount;
589593
}
590594

591-
if (next.FireAndForgetQueries) {
592-
auto& item (*next.FireAndForgetQueries);
593-
size_t i = 0;
595+
m_QueuedReads.Set();
596+
return true;
597+
}
594598

595-
DecreasePendingQueries(item.size());
599+
/**
600+
* Write multiple Redis queries in a fire-and-forget manner.
601+
*
602+
* @param item Redis queries
603+
*
604+
* @return true on success, false on failure.
605+
*/
606+
bool RedisConnection::WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc)
607+
{
608+
size_t i = 0;
596609

597-
try {
598-
for (auto& query : item) {
599-
WriteOne(query, yc);
600-
++i;
601-
}
602-
} catch (const std::exception& ex) {
603-
Log msg (LogCritical, "IcingaDB", "Error during sending query");
604-
LogQuery(item[i], msg);
605-
msg << " which has been fired and forgotten: " << ex.what();
610+
DecreasePendingQueries(item->size());
606611

607-
return;
612+
try {
613+
for (auto& query : *item) {
614+
WriteOne(query, yc);
615+
++i;
608616
}
617+
} catch (const std::exception& ex) {
618+
Log msg (LogCritical, "IcingaDB", "Error during sending query");
619+
LogQuery((*item)[i], msg);
620+
msg << " which has been fired and forgotten: " << ex.what();
609621

610-
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
611-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
612-
} else {
613-
m_Queues.FutureResponseActions.back().Amount += item.size();
614-
}
622+
return false;
623+
}
615624

616-
m_QueuedReads.Set();
625+
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
626+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->size(), ResponseAction::Ignore});
627+
} else {
628+
m_Queues.FutureResponseActions.back().Amount += item->size();
617629
}
618630

619-
if (next.GetResultOfQuery) {
620-
auto& item (*next.GetResultOfQuery);
621-
DecreasePendingQueries(1);
631+
m_QueuedReads.Set();
632+
return true;
633+
}
622634

623-
try {
624-
WriteOne(item.first, yc);
625-
} catch (const std::exception&) {
626-
item.second.set_exception(std::current_exception());
635+
/**
636+
* Write a single Redis query and enqueue a response promise to be fulfilled once the response has been received.
637+
*
638+
* @param item Redis query and promise for the response
639+
*/
640+
bool RedisConnection::WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc)
641+
{
642+
DecreasePendingQueries(1);
627643

628-
return;
629-
}
644+
try {
645+
WriteOne(item->first, yc);
646+
} catch (const std::exception&) {
647+
item->second.set_exception(std::current_exception());
630648

631-
m_Queues.ReplyPromises.emplace(std::move(item.second));
649+
return false;
650+
}
632651

633-
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
634-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
635-
} else {
636-
++m_Queues.FutureResponseActions.back().Amount;
637-
}
652+
m_Queues.ReplyPromises.push(std::move(item->second));
638653

639-
m_QueuedReads.Set();
654+
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
655+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
656+
} else {
657+
++m_Queues.FutureResponseActions.back().Amount;
640658
}
641659

642-
if (next.GetResultsOfQueries) {
643-
auto& item (*next.GetResultsOfQueries);
644-
DecreasePendingQueries(item.first.size());
660+
m_QueuedReads.Set();
661+
return true;
662+
}
645663

646-
try {
647-
for (auto& query : item.first) {
648-
WriteOne(query, yc);
649-
}
650-
} catch (const std::exception&) {
651-
item.second.set_exception(std::current_exception());
664+
/**
665+
* Write multiple Redis queries and enqueue a response promise to be fulfilled once all responses have been received.
666+
*
667+
* @param item Redis queries and promise for the responses.
668+
*
669+
* @return true on success, false on failure.
670+
*/
671+
bool RedisConnection::WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc)
672+
{
673+
DecreasePendingQueries(item->first.size());
652674

653-
return;
675+
try {
676+
for (auto& query : item->first) {
677+
WriteOne(query, yc);
654678
}
679+
} catch (const std::exception&) {
680+
item->second.set_exception(std::current_exception());
655681

656-
m_Queues.RepliesPromises.emplace(std::move(item.second));
657-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
658-
659-
m_QueuedReads.Set();
682+
return false;
660683
}
661684

662-
if (next.Callback) {
663-
next.Callback(yc);
664-
}
685+
m_Queues.RepliesPromises.emplace(std::move(item->second));
686+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->first.size(), ResponseAction::DeliverBulk});
665687

666-
RecordAffected(next.Affects, Utility::GetTime());
688+
m_QueuedReads.Set();
689+
return true;
690+
}
691+
692+
/**
693+
* Invokes the provided callback immediately.
694+
*
695+
* @param item Callback to execute
696+
*/
697+
bool RedisConnection::WriteItem(const QueryCallback& item, boost::asio::yield_context& yc)
698+
{
699+
item(yc);
700+
return true;
667701
}
668702

669703
/**

lib/icingadb/redisconnection.hpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <stdexcept>
4242
#include <utility>
4343
#include <vector>
44+
#include <variant>
4445

4546
namespace icinga
4647
{
@@ -135,19 +136,20 @@ namespace icinga
135136
ResponseAction Action;
136137
};
137138

139+
using FireAndForgetQ = Shared<Query>::Ptr; // A single query that does not expect a result.
140+
using FireAndForgetQs = Shared<Queries>::Ptr; // Multiple queries that do not expect results.
141+
using QueryWithPromise = Shared<std::pair<Query, std::promise<Reply>>>::Ptr; // A single query expecting a result.
142+
using QueriesWithPromise = Shared<std::pair<Queries, std::promise<Replies>>>::Ptr; // Multiple queries expecting results.
143+
using QueryCallback = std::function<void(boost::asio::yield_context&)>; // A callback to be executed.
144+
138145
/**
139-
* Something to be send to Redis.
146+
* An item in the write queue to be sent to Redis.
140147
*
141148
* @ingroup icingadb
142149
*/
143150
struct WriteQueueItem
144151
{
145-
Shared<Query>::Ptr FireAndForgetQuery;
146-
Shared<Queries>::Ptr FireAndForgetQueries;
147-
Shared<std::pair<Query, std::promise<Reply>>>::Ptr GetResultOfQuery;
148-
Shared<std::pair<Queries, std::promise<Replies>>>::Ptr GetResultsOfQueries;
149-
std::function<void(boost::asio::yield_context&)> Callback;
150-
152+
std::variant<FireAndForgetQ, FireAndForgetQs, QueryWithPromise, QueriesWithPromise, QueryCallback> Item;
151153
std::chrono::steady_clock::time_point CTime; // When was this item queued?
152154
QueryAffects Affects;
153155
bool HighPriority; // Does this item have high priority?
@@ -178,7 +180,11 @@ namespace icinga
178180
void ReadLoop(boost::asio::yield_context& yc);
179181
void WriteLoop(boost::asio::yield_context& yc);
180182
void LogStats(boost::asio::yield_context& yc);
181-
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
183+
bool WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc);
184+
bool WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc);
185+
bool WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc);
186+
bool WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc);
187+
bool WriteItem(const QueryCallback& item, boost::asio::yield_context& yc);
182188
Reply ReadOne(boost::asio::yield_context& yc);
183189
void WriteOne(Query& query, boost::asio::yield_context& yc);
184190

0 commit comments

Comments
 (0)