Skip to content

Commit d493deb

Browse files
refactor: remove gen1 duration refs
1 parent d6275d8 commit d493deb

File tree

3 files changed

+36
-25
lines changed

3 files changed

+36
-25
lines changed

influxdb3_write/src/write_buffer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ impl WriteBufferImpl {
224224
distinct_cache_provider: Arc::clone(&distinct_cache),
225225
persisted_files: Arc::clone(&persisted_files),
226226
parquet_cache: parquet_cache.clone(),
227-
gen1_duration: wal_config.gen1_duration,
228227
max_size_per_parquet_file_bytes: max_memory_for_snapshot_bytes,
229228
}));
230229

influxdb3_write/src/write_buffer/queryable_buffer.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCac
2121
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
2222
use influxdb3_id::{DbId, TableId};
2323
use influxdb3_wal::{
24-
CatalogOp, Gen1Duration, SnapshotDetails, WalContents, WalFileNotifier, WalFileSequenceNumber,
25-
WalOp, WriteBatch,
24+
CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp,
25+
WriteBatch,
2626
};
2727
use iox_query::QueryChunk;
2828
use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics};
@@ -58,7 +58,7 @@ pub struct QueryableBuffer {
5858
/// Sends a notification to this watch channel whenever a snapshot info is persisted
5959
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
6060
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
61-
gen1_duration: Gen1Duration,
61+
buffer_chunk_interval: Duration,
6262
max_size_per_parquet_file_bytes: u64,
6363
}
6464

@@ -71,7 +71,6 @@ pub struct QueryableBufferArgs {
7171
pub distinct_cache_provider: Arc<DistinctCacheProvider>,
7272
pub persisted_files: Arc<PersistedFiles>,
7373
pub parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
74-
pub gen1_duration: Gen1Duration,
7574
pub max_size_per_parquet_file_bytes: u64,
7675
}
7776

@@ -85,11 +84,14 @@ impl QueryableBuffer {
8584
distinct_cache_provider,
8685
persisted_files,
8786
parquet_cache,
88-
gen1_duration,
8987
max_size_per_parquet_file_bytes,
9088
}: QueryableBufferArgs,
9189
) -> Self {
92-
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog))));
90+
let buffer_chunk_interval = Duration::from_secs(60);
91+
let buffer = Arc::new(RwLock::new(BufferState::new(
92+
Arc::clone(&catalog),
93+
buffer_chunk_interval,
94+
)));
9395
let (persisted_snapshot_notify_tx, persisted_snapshot_notify_rx) =
9496
tokio::sync::watch::channel(None);
9597
Self {
@@ -103,8 +105,8 @@ impl QueryableBuffer {
103105
parquet_cache,
104106
persisted_snapshot_notify_rx,
105107
persisted_snapshot_notify_tx,
106-
gen1_duration,
107108
max_size_per_parquet_file_bytes,
109+
buffer_chunk_interval,
108110
}
109111
}
110112

@@ -268,8 +270,8 @@ impl QueryableBuffer {
268270
let catalog = Arc::clone(&self.catalog);
269271
let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone();
270272
let parquet_cache = self.parquet_cache.clone();
271-
let gen1_duration = self.gen1_duration;
272273
let max_size_per_parquet_file = self.max_size_per_parquet_file_bytes;
274+
let chunk_interval = self.num_chunk_intervals_in_10m();
273275

274276
tokio::spawn(async move {
275277
// persist the catalog if it has been updated
@@ -321,7 +323,7 @@ impl QueryableBuffer {
321323
Arc::from(persister.node_identifier_prefix()),
322324
wal_file_number,
323325
Arc::clone(&catalog),
324-
gen1_duration.as_10m() as usize,
326+
chunk_interval,
325327
Some(max_size_per_parquet_file),
326328
);
327329

@@ -351,7 +353,7 @@ impl QueryableBuffer {
351353
Arc::from(persister.node_identifier_prefix()),
352354
wal_file_number,
353355
Arc::clone(&catalog),
354-
gen1_duration.as_10m() as usize,
356+
chunk_interval,
355357
None,
356358
);
357359

@@ -453,6 +455,16 @@ impl QueryableBuffer {
453455
let buffer = self.buffer.read();
454456
buffer.find_overall_buffer_size_bytes()
455457
}
458+
459+
fn num_chunk_intervals_in_10m(&self) -> usize {
460+
let ten_mins_secs = 600;
461+
let chunk_interval_secs = self.buffer_chunk_interval.as_secs();
462+
if chunk_interval_secs >= ten_mins_secs {
463+
return 1;
464+
}
465+
let num_chunks_in_ten_mins = ten_mins_secs / self.buffer_chunk_interval.as_secs();
466+
num_chunks_in_ten_mins as usize
467+
}
456468
}
457469

458470
async fn sort_dedupe_parallel<I: Iterator<Item = PersistJob>>(
@@ -619,15 +631,17 @@ impl WalFileNotifier for QueryableBuffer {
619631
pub struct BufferState {
620632
pub db_to_table: HashMap<DbId, TableIdToBufferMap>,
621633
catalog: Arc<Catalog>,
634+
chunk_interval: Duration,
622635
}
623636

624637
type TableIdToBufferMap = HashMap<TableId, TableBuffer>;
625638

626639
impl BufferState {
627-
pub fn new(catalog: Arc<Catalog>) -> Self {
640+
pub fn new(catalog: Arc<Catalog>, chunk_interval: Duration) -> Self {
628641
Self {
629642
db_to_table: HashMap::new(),
630643
catalog,
644+
chunk_interval,
631645
}
632646
}
633647

@@ -741,14 +755,14 @@ impl BufferState {
741755

742756
let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default();
743757
// keep internal query buffer chunks divided by 1m
744-
let one_min_ns = Duration::from_secs(60).as_nanos() as i64;
758+
let one_min_ns = self.chunk_interval.as_nanos() as i64;
745759

746760
for (table_id, table_chunks) in &write_batch.table_chunks {
747761
let table_buffer = database_buffer.entry(*table_id).or_insert_with(|| {
748762
let table_def = db_schema
749763
.table_definition_by_id(table_id)
750764
.expect("table should exist");
751-
TableBuffer::new(table_def.sort_key())
765+
TableBuffer::new(table_def.sort_key(), self.chunk_interval)
752766
});
753767

754768
let mut one_min_groups = HashMap::new();
@@ -1127,7 +1141,6 @@ mod tests {
11271141
.unwrap(),
11281142
persisted_files: Arc::new(PersistedFiles::new()),
11291143
parquet_cache: None,
1130-
gen1_duration: Gen1Duration::new_1m(),
11311144
max_size_per_parquet_file_bytes: 4_000,
11321145
};
11331146
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
@@ -1281,7 +1294,6 @@ mod tests {
12811294
.unwrap(),
12821295
persisted_files: Arc::new(PersistedFiles::new()),
12831296
parquet_cache: None,
1284-
gen1_duration: Gen1Duration::new_1m(),
12851297
max_size_per_parquet_file_bytes: 50_000,
12861298
};
12871299
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
@@ -1410,7 +1422,6 @@ mod tests {
14101422
.unwrap(),
14111423
persisted_files: Arc::new(PersistedFiles::new()),
14121424
parquet_cache: None,
1413-
gen1_duration: Gen1Duration::new_1m(),
14141425
max_size_per_parquet_file_bytes: 2_000,
14151426
};
14161427
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
@@ -1544,7 +1555,6 @@ mod tests {
15441555
.unwrap(),
15451556
persisted_files: Arc::new(PersistedFiles::new()),
15461557
parquet_cache: None,
1547-
gen1_duration: Gen1Duration::new_1m(),
15481558
max_size_per_parquet_file_bytes: 150_000,
15491559
};
15501560
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);

influxdb3_write/src/write_buffer/table_buffer.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ use influxdb3_wal::{FieldData, Row};
1414
use observability_deps::tracing::error;
1515
use schema::sort::SortKey;
1616
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
17-
use std::collections::BTreeMap;
1817
use std::mem::size_of;
1918
use std::sync::Arc;
19+
use std::{collections::BTreeMap, time::Duration};
2020
use std::{collections::btree_map::Entry, slice::Iter};
2121
use thiserror::Error;
2222

@@ -37,14 +37,16 @@ pub struct TableBuffer {
3737
pub(crate) chunk_time_to_chunks: BTreeMap<i64, MutableTableChunk>,
3838
pub(crate) snapshotting_chunks: Vec<SnapshotChunk>,
3939
pub(crate) sort_key: SortKey,
40+
pub(crate) _chunk_interval: Duration,
4041
}
4142

4243
impl TableBuffer {
43-
pub fn new(sort_key: SortKey) -> Self {
44+
pub fn new(sort_key: SortKey, chunk_interval: Duration) -> Self {
4445
Self {
4546
chunk_time_to_chunks: BTreeMap::default(),
4647
snapshotting_chunks: Vec::new(),
4748
sort_key,
49+
_chunk_interval: chunk_interval,
4850
}
4951
}
5052

@@ -692,7 +694,7 @@ mod tests {
692694

693695
let table_def = writer.db_schema().table_definition("tbl").unwrap();
694696

695-
let mut table_buffer = TableBuffer::new(SortKey::empty());
697+
let mut table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
696698
for (rows, offset) in row_batches {
697699
table_buffer.buffer_chunk(offset, &rows);
698700
}
@@ -742,16 +744,16 @@ mod tests {
742744
0,
743745
);
744746

745-
let mut table_buffer = TableBuffer::new(SortKey::empty());
747+
let mut table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
746748
table_buffer.buffer_chunk(0, &rows);
747749

748750
let size = table_buffer.computed_size();
749-
assert_eq!(size, 17769);
751+
assert_eq!(size, 17785);
750752
}
751753

752754
#[test]
753755
fn timestamp_min_max_works_when_empty() {
754-
let table_buffer = TableBuffer::new(SortKey::empty());
756+
let table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
755757
let timestamp_min_max = table_buffer.timestamp_min_max();
756758
assert_eq!(timestamp_min_max.min, 0);
757759
assert_eq!(timestamp_min_max.max, 0);
@@ -777,7 +779,7 @@ mod tests {
777779
row_batches.push((offset, rows));
778780
}
779781
let table_def = writer.db_schema().table_definition("tbl").unwrap();
780-
let mut table_buffer = TableBuffer::new(SortKey::empty());
782+
let mut table_buffer = TableBuffer::new(SortKey::empty(), Duration::from_secs(60));
781783

782784
for (offset, rows) in row_batches {
783785
table_buffer.buffer_chunk(offset, &rows);

0 commit comments

Comments
 (0)