Skip to content

Commit 3aaf3d5

Browse files
refactor: remove gen1 duration refs
1 parent d6275d8 commit 3aaf3d5

File tree

3 files changed

+32
-24
lines changed

3 files changed

+32
-24
lines changed

influxdb3_write/src/write_buffer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ impl WriteBufferImpl {
224224
distinct_cache_provider: Arc::clone(&distinct_cache),
225225
persisted_files: Arc::clone(&persisted_files),
226226
parquet_cache: parquet_cache.clone(),
227-
gen1_duration: wal_config.gen1_duration,
228227
max_size_per_parquet_file_bytes: max_memory_for_snapshot_bytes,
229228
}));
230229

influxdb3_write/src/write_buffer/queryable_buffer.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCac
2121
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
2222
use influxdb3_id::{DbId, TableId};
2323
use influxdb3_wal::{
24-
CatalogOp, Gen1Duration, SnapshotDetails, WalContents, WalFileNotifier, WalFileSequenceNumber,
24+
CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalFileSequenceNumber,
2525
WalOp, WriteBatch,
2626
};
2727
use iox_query::QueryChunk;
@@ -58,7 +58,7 @@ pub struct QueryableBuffer {
5858
/// Sends a notification to this watch channel whenever a snapshot info is persisted
5959
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
6060
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
61-
gen1_duration: Gen1Duration,
61+
buffer_chunk_interval: Duration,
6262
max_size_per_parquet_file_bytes: u64,
6363
}
6464

@@ -71,7 +71,6 @@ pub struct QueryableBufferArgs {
7171
pub distinct_cache_provider: Arc<DistinctCacheProvider>,
7272
pub persisted_files: Arc<PersistedFiles>,
7373
pub parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
74-
pub gen1_duration: Gen1Duration,
7574
pub max_size_per_parquet_file_bytes: u64,
7675
}
7776

@@ -85,11 +84,11 @@ impl QueryableBuffer {
8584
distinct_cache_provider,
8685
persisted_files,
8786
parquet_cache,
88-
gen1_duration,
8987
max_size_per_parquet_file_bytes,
9088
}: QueryableBufferArgs,
9189
) -> Self {
92-
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog))));
90+
let buffer_chunk_interval = Duration::from_secs(60);
91+
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog), buffer_chunk_interval)));
9392
let (persisted_snapshot_notify_tx, persisted_snapshot_notify_rx) =
9493
tokio::sync::watch::channel(None);
9594
Self {
@@ -103,8 +102,8 @@ impl QueryableBuffer {
103102
parquet_cache,
104103
persisted_snapshot_notify_rx,
105104
persisted_snapshot_notify_tx,
106-
gen1_duration,
107105
max_size_per_parquet_file_bytes,
106+
buffer_chunk_interval,
108107
}
109108
}
110109

@@ -268,8 +267,8 @@ impl QueryableBuffer {
268267
let catalog = Arc::clone(&self.catalog);
269268
let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone();
270269
let parquet_cache = self.parquet_cache.clone();
271-
let gen1_duration = self.gen1_duration;
272270
let max_size_per_parquet_file = self.max_size_per_parquet_file_bytes;
271+
let chunk_interval = self.num_chunk_intervals_in_10m();
273272

274273
tokio::spawn(async move {
275274
// persist the catalog if it has been updated
@@ -321,7 +320,7 @@ impl QueryableBuffer {
321320
Arc::from(persister.node_identifier_prefix()),
322321
wal_file_number,
323322
Arc::clone(&catalog),
324-
gen1_duration.as_10m() as usize,
323+
chunk_interval,
325324
Some(max_size_per_parquet_file),
326325
);
327326

@@ -351,7 +350,7 @@ impl QueryableBuffer {
351350
Arc::from(persister.node_identifier_prefix()),
352351
wal_file_number,
353352
Arc::clone(&catalog),
354-
gen1_duration.as_10m() as usize,
353+
chunk_interval,
355354
None,
356355
);
357356

@@ -453,6 +452,16 @@ impl QueryableBuffer {
453452
let buffer = self.buffer.read();
454453
buffer.find_overall_buffer_size_bytes()
455454
}
455+
456+
fn num_chunk_intervals_in_10m(&self) -> usize {
457+
let ten_mins_secs = 600;
458+
let chunk_interval_secs = self.buffer_chunk_interval.as_secs();
459+
if chunk_interval_secs >= ten_mins_secs {
460+
return 1;
461+
}
462+
let num_chunks_in_ten_mins = ten_mins_secs / self.buffer_chunk_interval.as_secs();
463+
num_chunks_in_ten_mins as usize
464+
}
456465
}
457466

458467
async fn sort_dedupe_parallel<I: Iterator<Item = PersistJob>>(
@@ -619,15 +628,17 @@ impl WalFileNotifier for QueryableBuffer {
619628
pub struct BufferState {
620629
pub db_to_table: HashMap<DbId, TableIdToBufferMap>,
621630
catalog: Arc<Catalog>,
631+
chunk_interval: Duration,
622632
}
623633

624634
type TableIdToBufferMap = HashMap<TableId, TableBuffer>;
625635

626636
impl BufferState {
627-
pub fn new(catalog: Arc<Catalog>) -> Self {
637+
pub fn new(catalog: Arc<Catalog>, chunk_interval: Duration) -> Self {
628638
Self {
629639
db_to_table: HashMap::new(),
630640
catalog,
641+
chunk_interval,
631642
}
632643
}
633644

@@ -741,14 +752,14 @@ impl BufferState {
741752

742753
let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default();
743754
// keep internal query buffer chunks divided by 1m
744-
let one_min_ns = Duration::from_secs(60).as_nanos() as i64;
755+
let one_min_ns = self.chunk_interval.as_nanos() as i64;
745756

746757
for (table_id, table_chunks) in &write_batch.table_chunks {
747758
let table_buffer = database_buffer.entry(*table_id).or_insert_with(|| {
748759
let table_def = db_schema
749760
.table_definition_by_id(table_id)
750761
.expect("table should exist");
751-
TableBuffer::new(table_def.sort_key())
762+
TableBuffer::new(table_def.sort_key(), self.chunk_interval)
752763
});
753764

754765
let mut one_min_groups = HashMap::new();
@@ -1127,7 +1138,6 @@ mod tests {
11271138
.unwrap(),
11281139
persisted_files: Arc::new(PersistedFiles::new()),
11291140
parquet_cache: None,
1130-
gen1_duration: Gen1Duration::new_1m(),
11311141
max_size_per_parquet_file_bytes: 4_000,
11321142
};
11331143
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
@@ -1281,7 +1291,6 @@ mod tests {
12811291
.unwrap(),
12821292
persisted_files: Arc::new(PersistedFiles::new()),
12831293
parquet_cache: None,
1284-
gen1_duration: Gen1Duration::new_1m(),
12851294
max_size_per_parquet_file_bytes: 50_000,
12861295
};
12871296
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
@@ -1410,7 +1419,6 @@ mod tests {
14101419
.unwrap(),
14111420
persisted_files: Arc::new(PersistedFiles::new()),
14121421
parquet_cache: None,
1413-
gen1_duration: Gen1Duration::new_1m(),
14141422
max_size_per_parquet_file_bytes: 2_000,
14151423
};
14161424
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
@@ -1544,7 +1552,6 @@ mod tests {
15441552
.unwrap(),
15451553
persisted_files: Arc::new(PersistedFiles::new()),
15461554
parquet_cache: None,
1547-
gen1_duration: Gen1Duration::new_1m(),
15481555
max_size_per_parquet_file_bytes: 150_000,
15491556
};
15501557
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);

influxdb3_write/src/write_buffer/table_buffer.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use influxdb3_wal::{FieldData, Row};
1414
use observability_deps::tracing::error;
1515
use schema::sort::SortKey;
1616
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
17-
use std::collections::BTreeMap;
17+
use std::{collections::BTreeMap, time::Duration};
1818
use std::mem::size_of;
1919
use std::sync::Arc;
2020
use std::{collections::btree_map::Entry, slice::Iter};
@@ -37,14 +37,16 @@ pub struct TableBuffer {
3737
pub(crate) chunk_time_to_chunks: BTreeMap<i64, MutableTableChunk>,
3838
pub(crate) snapshotting_chunks: Vec<SnapshotChunk>,
3939
pub(crate) sort_key: SortKey,
40+
pub(crate) _chunk_interval: Duration,
4041
}
4142

4243
impl TableBuffer {
43-
pub fn new(sort_key: SortKey) -> Self {
44+
pub fn new(sort_key: SortKey, chunk_interval: Duration) -> Self {
4445
Self {
4546
chunk_time_to_chunks: BTreeMap::default(),
4647
snapshotting_chunks: Vec::new(),
4748
sort_key,
49+
_chunk_interval: chunk_interval,
4850
}
4951
}
5052

@@ -692,7 +694,7 @@ mod tests {
692694

693695
let table_def = writer.db_schema().table_definition("tbl").unwrap();
694696

695-
let mut table_buffer = TableBuffer::new(SortKey::empty());
697+
let mut table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
696698
for (rows, offset) in row_batches {
697699
table_buffer.buffer_chunk(offset, &rows);
698700
}
@@ -742,16 +744,16 @@ mod tests {
742744
0,
743745
);
744746

745-
let mut table_buffer = TableBuffer::new(SortKey::empty());
747+
let mut table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
746748
table_buffer.buffer_chunk(0, &rows);
747749

748750
let size = table_buffer.computed_size();
749-
assert_eq!(size, 17769);
751+
assert_eq!(size, 17785);
750752
}
751753

752754
#[test]
753755
fn timestamp_min_max_works_when_empty() {
754-
let table_buffer = TableBuffer::new(SortKey::empty());
756+
let table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
755757
let timestamp_min_max = table_buffer.timestamp_min_max();
756758
assert_eq!(timestamp_min_max.min, 0);
757759
assert_eq!(timestamp_min_max.max, 0);
@@ -777,7 +779,7 @@ mod tests {
777779
row_batches.push((offset, rows));
778780
}
779781
let table_def = writer.db_schema().table_definition("tbl").unwrap();
780-
let mut table_buffer = TableBuffer::new(SortKey::empty());
782+
let mut table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
781783

782784
for (offset, rows) in row_batches {
783785
table_buffer.buffer_chunk(offset, &rows);

0 commit comments

Comments
 (0)