@@ -260,9 +260,7 @@ export class MarQS {
260260 public async replaceMessage (
261261 messageId : string ,
262262 messageData : Record < string , unknown > ,
263- timestamp ?: number ,
264- priority ?: number ,
265- inplace ?: boolean
263+ timestamp ?: number
266264 ) {
267265 return this . #trace(
268266 "replaceMessage" ,
@@ -273,6 +271,57 @@ export class MarQS {
273271 return ;
274272 }
275273
274+ span . setAttributes ( {
275+ [ SemanticAttributes . QUEUE ] : oldMessage . queue ,
276+ [ SemanticAttributes . MESSAGE_ID ] : oldMessage . messageId ,
277+ [ SemanticAttributes . CONCURRENCY_KEY ] : oldMessage . concurrencyKey ,
278+ [ SemanticAttributes . PARENT_QUEUE ] : oldMessage . parentQueue ,
279+ } ) ;
280+
281+ const traceContext = {
282+ traceparent : oldMessage . data . traceparent ,
283+ tracestate : oldMessage . data . tracestate ,
284+ } ;
285+
286+ const newMessage : MessagePayload = {
287+ version : "1" ,
288+ // preserve original trace context
289+ data : { ...oldMessage . data , ...messageData , ...traceContext , queue : oldMessage . queue } ,
290+ queue : oldMessage . queue ,
291+ concurrencyKey : oldMessage . concurrencyKey ,
292+ timestamp : timestamp ?? Date . now ( ) ,
293+ messageId,
294+ parentQueue : oldMessage . parentQueue ,
295+ } ;
296+
297+ await this . #callReplaceMessage( newMessage ) ;
298+ } ,
299+ {
300+ kind : SpanKind . CONSUMER ,
301+ attributes : {
302+ [ SEMATTRS_MESSAGING_OPERATION ] : "replace" ,
303+ [ SEMATTRS_MESSAGE_ID ] : messageId ,
304+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
305+ } ,
306+ }
307+ ) ;
308+ }
309+
310+ public async requeueMessage (
311+ messageId : string ,
312+ messageData : Record < string , unknown > ,
313+ timestamp ?: number ,
314+ priority ?: number
315+ ) {
316+ return this . #trace(
317+ "requeueMessage" ,
318+ async ( span ) => {
319+ const oldMessage = await this . readMessage ( messageId ) ;
320+
321+ if ( ! oldMessage ) {
322+ return ;
323+ }
324+
276325 const queue = this . keys . queueKeyFromQueue ( oldMessage . queue , priority ) ;
277326
278327 span . setAttributes ( {
@@ -298,27 +347,16 @@ export class MarQS {
298347 parentQueue : oldMessage . parentQueue ,
299348 } ;
300349
301- if ( inplace ) {
302- await this . #callReplaceMessage( newMessage ) ;
303- return ;
304- }
305-
306350 await this . options . visibilityTimeoutStrategy . cancelHeartbeat ( messageId ) ;
307351
308- await this . #callAcknowledgeMessage( {
309- parentQueue : oldMessage . parentQueue ,
310- messageQueue : oldMessage . queue ,
311- messageId,
312- } ) ;
352+ await this . #callRequeueMessage( oldMessage . queue , newMessage ) ;
313353
314- await this . #callEnqueueMessage( newMessage ) ;
315-
316- await this . options . subscriber ?. messageReplaced ( newMessage ) ;
354+ await this . options . subscriber ?. messageRequeued ( oldMessage . queue , newMessage ) ;
317355 } ,
318356 {
319357 kind : SpanKind . CONSUMER ,
320358 attributes : {
321- [ SEMATTRS_MESSAGING_OPERATION ] : "replace " ,
359+ [ SEMATTRS_MESSAGING_OPERATION ] : "requeue " ,
322360 [ SEMATTRS_MESSAGE_ID ] : messageId ,
323361 [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
324362 } ,
@@ -602,7 +640,7 @@ export class MarQS {
602640 } ) ;
603641
604642 if ( updates ) {
605- await this . replaceMessage ( messageId , updates , retryAt , undefined , true ) ;
643+ await this . replaceMessage ( messageId , updates , retryAt ) ;
606644 }
607645
608646 await this . options . visibilityTimeoutStrategy . cancelHeartbeat ( messageId ) ;
@@ -1163,6 +1201,78 @@ export class MarQS {
11631201 ) ;
11641202 }
11651203
1204+ async #callRequeueMessage( oldQueue : string , message : MessagePayload ) {
1205+ const queueKey = message . queue ;
1206+ const oldQueueKey = oldQueue ;
1207+ const parentQueueKey = message . parentQueue ;
1208+ const messageKey = this . keys . messageKey ( message . messageId ) ;
1209+ const queueCurrentConcurrencyKey = this . keys . queueCurrentConcurrencyKeyFromQueue ( message . queue ) ;
1210+ const queueReserveConcurrencyKey = this . keys . queueReserveConcurrencyKeyFromQueue ( message . queue ) ;
1211+ const envCurrentConcurrencyKey = this . keys . envCurrentConcurrencyKeyFromQueue ( message . queue ) ;
1212+ const envReserveConcurrencyKey = this . keys . envReserveConcurrencyKeyFromQueue ( message . queue ) ;
1213+ const envQueueKey = this . keys . envQueueKeyFromQueue ( message . queue ) ;
1214+
1215+ const queueName = message . queue ;
1216+ const oldQueueName = oldQueue ;
1217+ const messageId = message . messageId ;
1218+ const messageData = JSON . stringify ( message ) ;
1219+ const messageScore = String ( message . timestamp ) ;
1220+
1221+ logger . debug ( "Calling requeueMessage" , {
1222+ service : this . name ,
1223+ queueKey,
1224+ oldQueueKey,
1225+ parentQueueKey,
1226+ messageKey,
1227+ queueCurrentConcurrencyKey,
1228+ queueReserveConcurrencyKey,
1229+ envCurrentConcurrencyKey,
1230+ envReserveConcurrencyKey,
1231+ envQueueKey,
1232+ queueName,
1233+ oldQueueName,
1234+ messageId,
1235+ messageData,
1236+ messageScore,
1237+ } ) ;
1238+
1239+ const result = await this . redis . requeueMessage (
1240+ queueKey ,
1241+ oldQueueKey ,
1242+ parentQueueKey ,
1243+ messageKey ,
1244+ queueCurrentConcurrencyKey ,
1245+ queueReserveConcurrencyKey ,
1246+ envCurrentConcurrencyKey ,
1247+ envReserveConcurrencyKey ,
1248+ envQueueKey ,
1249+ queueName ,
1250+ oldQueueName ,
1251+ messageId ,
1252+ messageData ,
1253+ messageScore
1254+ ) ;
1255+
1256+ logger . debug ( "requeueMessage result" , {
1257+ service : this . name ,
1258+ queueKey,
1259+ parentQueueKey,
1260+ messageKey,
1261+ queueCurrentConcurrencyKey,
1262+ queueReserveConcurrencyKey,
1263+ envCurrentConcurrencyKey,
1264+ envReserveConcurrencyKey,
1265+ envQueueKey,
1266+ queueName,
1267+ messageId,
1268+ messageData,
1269+ messageScore,
1270+ result,
1271+ } ) ;
1272+
1273+ return true ;
1274+ }
1275+
11661276 async #callAcknowledgeMessage( {
11671277 parentQueue,
11681278 messageQueue,
@@ -1587,6 +1697,61 @@ redis.call('DEL', messageKey)
15871697` ,
15881698 } ) ;
15891699
1700+ this . redis . defineCommand ( "requeueMessage" , {
1701+ numberOfKeys : 9 ,
1702+ lua : `
1703+ local queueKey = KEYS[1]
1704+ local oldQueueKey = KEYS[2]
1705+ local parentQueueKey = KEYS[3]
1706+ local messageKey = KEYS[4]
1707+ local queueCurrentConcurrencyKey = KEYS[5]
1708+ local queueReserveConcurrencyKey = KEYS[6]
1709+ local envCurrentConcurrencyKey = KEYS[7]
1710+ local envReserveConcurrencyKey = KEYS[8]
1711+ local envQueueKey = KEYS[9]
1712+
1713+ local queueName = ARGV[1]
1714+ local oldQueueName = ARGV[2]
1715+ local messageId = ARGV[3]
1716+ local messageData = ARGV[4]
1717+ local messageScore = ARGV[5]
1718+
1719+ -- First remove the message from the old queue
1720+ redis.call('ZREM', oldQueueKey, messageId)
1721+
1722+ -- Write the new message data
1723+ redis.call('SET', messageKey, messageData)
1724+
1725+ -- Add the message to the new queue with a new score
1726+ redis.call('ZADD', queueKey, messageScore, messageId)
1727+ redis.call('ZADD', envQueueKey, messageScore, messageId)
1728+
1729+ -- Rebalance the parent queue (for the new queue)
1730+ local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
1731+ if #earliestMessage == 0 then
1732+ redis.call('ZREM', parentQueueKey, queueName)
1733+ else
1734+ redis.call('ZADD', parentQueueKey, earliestMessage[2], queueName)
1735+ end
1736+
1737+ -- Rebalance the parent queue (for the old queue)
1738+ local earliestMessage = redis.call('ZRANGE', oldQueueKey, 0, 0, 'WITHSCORES')
1739+ if #earliestMessage == 0 then
1740+ redis.call('ZREM', parentQueueKey, oldQueueName)
1741+ else
1742+ redis.call('ZADD', parentQueueKey, earliestMessage[2], oldQueueName)
1743+ end
1744+
1745+ -- Clear all concurrency sets (combined from both scripts)
1746+ redis.call('SREM', queueCurrentConcurrencyKey, messageId)
1747+ redis.call('SREM', queueReserveConcurrencyKey, messageId)
1748+ redis.call('SREM', envCurrentConcurrencyKey, messageId)
1749+ redis.call('SREM', envReserveConcurrencyKey, messageId)
1750+
1751+ return true
1752+ ` ,
1753+ } ) ;
1754+
15901755 this . redis . defineCommand ( "nackMessage" , {
15911756 numberOfKeys : 7 ,
15921757 lua : `
@@ -1749,6 +1914,24 @@ declare module "ioredis" {
17491914 callback ?: Callback < void >
17501915 ) : Result < void , Context > ;
17511916
1917+ requeueMessage (
1918+ queueKey : string ,
1919+ oldQueueKey : string ,
1920+ parentQueueKey : string ,
1921+ messageKey : string ,
1922+ queueCurrentConcurrencyKey : string ,
1923+ queueReserveConcurrencyKey : string ,
1924+ envCurrentConcurrencyKey : string ,
1925+ envReserveConcurrencyKey : string ,
1926+ envQueueKey : string ,
1927+ queueName : string ,
1928+ oldQueueName : string ,
1929+ messageId : string ,
1930+ messageData : string ,
1931+ messageScore : string ,
1932+ callback ?: Callback < string >
1933+ ) : Result < string , Context > ;
1934+
17521935 acknowledgeMessage (
17531936 parentQueue : string ,
17541937 messageKey : string ,
0 commit comments