1- use crate :: paths:: ParquetFilePath ;
21use crate :: persister:: Persister ;
32use crate :: write_buffer:: persisted_files:: PersistedFiles ;
43use crate :: write_buffer:: table_buffer:: TableBuffer ;
54use crate :: { ChunkFilter , ParquetFile , ParquetFileId , PersistedSnapshot } ;
65use crate :: { chunk:: BufferChunk , write_buffer:: table_buffer:: SnaphotChunkIter } ;
6+ use crate :: { paths:: ParquetFilePath , write_buffer:: table_buffer:: array_ref_nulls_for_type} ;
77use anyhow:: Context ;
8- use arrow:: {
9- array:: { AsArray , UInt64Array } ,
10- compute:: take,
11- datatypes:: TimestampNanosecondType ,
12- record_batch:: RecordBatch ,
13- } ;
8+ use arrow:: record_batch:: RecordBatch ;
149use async_trait:: async_trait;
1510use data_types:: {
1611 ChunkId , ChunkOrder , PartitionHashId , PartitionId , PartitionKey , TimestampMinMax ,
@@ -24,7 +19,10 @@ use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle};
2419use influxdb3_cache:: { distinct_cache:: DistinctCacheProvider , last_cache:: LastCacheProvider } ;
2520use influxdb3_catalog:: catalog:: { Catalog , DatabaseSchema , TableDefinition } ;
2621use influxdb3_id:: { DbId , TableId } ;
27- use influxdb3_wal:: { CatalogOp , SnapshotDetails , WalContents , WalFileNotifier , WalOp , WriteBatch } ;
22+ use influxdb3_wal:: {
23+ CatalogOp , SnapshotDetails , WalContents , WalFileNotifier , WalFileSequenceNumber , WalOp ,
24+ WriteBatch ,
25+ } ;
2826use iox_query:: QueryChunk ;
2927use iox_query:: chunk_statistics:: { NoColumnRanges , create_chunk_statistics} ;
3028use iox_query:: exec:: Executor ;
@@ -36,9 +34,9 @@ use parking_lot::RwLock;
3634use parquet:: format:: FileMetaData ;
3735use schema:: Schema ;
3836use schema:: sort:: SortKey ;
39- use std:: sync :: Arc ;
37+ use std:: any :: Any ;
4038use std:: time:: Duration ;
41- use std:: { any :: Any , collections :: BTreeMap } ;
39+ use std:: { iter :: Peekable , slice :: Iter , sync :: Arc } ;
4240use tokio:: sync:: oneshot:: { self , Receiver } ;
4341use tokio:: task:: JoinSet ;
4442
@@ -217,6 +215,7 @@ impl QueryableBuffer {
217215
218216 let persist_job = PersistJob {
219217 database_id : * database_id,
218+ database_name : Arc :: clone ( & db_schema. name ) ,
220219 table_id : * table_id,
221220 table_name : Arc :: clone ( & table_name) ,
222221 chunk_time : chunk. chunk_time ,
@@ -231,14 +230,13 @@ impl QueryableBuffer {
231230 None ,
232231 ) ,
233232 // these clones are cheap and done one at a time
234- batch : chunk. record_batch . clone ( ) ,
233+ batch : vec ! [ chunk. record_batch. clone( ) ] ,
235234 schema : chunk. schema . clone ( ) ,
236235 timestamp_min_max : chunk. timestamp_min_max ,
237236 sort_key : sort_key. clone ( ) ,
238237 } ;
239238 persisting_chunks. push ( persist_job) ;
240- snapshot_chunks. push_back ( chunk) ;
241- // snapshot_chunks.add_one(chunk);
239+ snapshot_chunks. push ( chunk) ;
242240 debug ! ( ">>> finished with chunk" ) ;
243241 }
244242 }
@@ -322,6 +320,9 @@ impl QueryableBuffer {
322320 ) ) ) ;
323321
324322 sort_dedupe_parallel (
323+ Arc :: from ( persister. node_identifier_prefix ( ) ) ,
324+ wal_file_number,
325+ Arc :: clone ( & catalog) ,
325326 persist_jobs,
326327 & persister,
327328 executor,
@@ -421,7 +422,11 @@ impl QueryableBuffer {
421422 }
422423}
423424
425+ #[ allow( clippy:: too_many_arguments) ]
424426async fn sort_dedupe_parallel (
427+ host_prefix : Arc < str > ,
428+ wal_file_number : WalFileSequenceNumber ,
429+ catalog : Arc < Catalog > ,
425430 persist_jobs : Vec < PersistJob > ,
426431 persister : & Arc < Persister > ,
427432 executor : Arc < Executor > ,
@@ -430,10 +435,16 @@ async fn sort_dedupe_parallel(
430435 persisted_files : Arc < PersistedFiles > ,
431436 persisted_snapshot : Arc < Mutex < PersistedSnapshot > > ,
432437) {
433- // if gen1 duration is 1m we should combine upto 10 of them
434- // to create a single parquet file
438+ let iterator = PersistJobGroupedIterator :: new (
439+ & persist_jobs,
440+ Arc :: clone ( & host_prefix) ,
441+ wal_file_number,
442+ Arc :: clone ( & catalog) ,
443+ 10 ,
444+ ) ;
445+
435446 let mut set = JoinSet :: new ( ) ;
436- for persist_job in persist_jobs {
447+ for persist_job in iterator {
437448 let persister = Arc :: clone ( persister) ;
438449 let executor = Arc :: clone ( & executor) ;
439450 let persisted_snapshot = Arc :: clone ( & persisted_snapshot) ;
@@ -560,34 +571,7 @@ async fn sort_dedupe_serial(
560571 }
561572 }
562573
563- persisted_snapshot
564- . add_parquet_file ( database_id, table_id, parquet_file)
565- }
566- }
567-
568- #[ derive( Debug ) ]
569- struct MinMax {
570- min : i64 ,
571- max : i64 ,
572- }
573-
574- impl MinMax {
575- fn new ( min : i64 , max : i64 ) -> Self {
576- // this doesn't check if min < max, a lot of the times
577- // it's good to start with i64::MAX for min and i64::MIN
578- // for max in loops so this type unlike TimestampMinMax
579- // doesn't check this pre-condition
580- Self { min, max }
581- }
582-
583- fn update ( & mut self , other : i64 ) {
584- self . min = other. min ( self . min ) ;
585- self . max = other. max ( self . max ) ;
586- }
587-
588- fn to_ts_min_max ( & self ) -> TimestampMinMax {
589- // at this point min < max
590- TimestampMinMax :: new ( self . min , self . max )
574+ persisted_snapshot. add_parquet_file ( database_id, table_id, parquet_file)
591575 }
592576}
593577
@@ -768,16 +752,176 @@ impl BufferState {
768752#[ derive( Debug ) ]
769753struct PersistJob {
770754 database_id : DbId ,
755+ database_name : Arc < str > ,
771756 table_id : TableId ,
772757 table_name : Arc < str > ,
773758 chunk_time : i64 ,
774759 path : ParquetFilePath ,
775- batch : RecordBatch ,
760+ batch : Vec < RecordBatch > ,
776761 schema : Schema ,
777762 timestamp_min_max : TimestampMinMax ,
778763 sort_key : SortKey ,
779764}
780765
766+ struct PersistJobGroupedIterator < ' a > {
767+ iter : Peekable < Iter < ' a , PersistJob > > ,
768+ host_prefix : Arc < str > ,
769+ wal_file_number : WalFileSequenceNumber ,
770+ catalog : Arc < Catalog > ,
771+ chunk_size : usize ,
772+ }
773+
774+ impl < ' a > PersistJobGroupedIterator < ' a > {
775+ fn new (
776+ data : & ' a [ PersistJob ] ,
777+ host_prefix : Arc < str > ,
778+ wal_file_number : WalFileSequenceNumber ,
779+ catalog : Arc < Catalog > ,
780+ chunk_size : usize ,
781+ ) -> Self {
782+ PersistJobGroupedIterator {
783+ iter : data. iter ( ) . peekable ( ) ,
784+ host_prefix : Arc :: clone ( & host_prefix) ,
785+ wal_file_number,
786+ catalog,
787+ chunk_size,
788+ }
789+ }
790+ }
791+
792+ impl Iterator for PersistJobGroupedIterator < ' _ > {
793+ // This is a grouped persist job, since it includes exactly
794+ // same fields with only difference being each job has a vec
795+ // of batches, it's been reused for now. For clarity it might
796+ // be better to have different types to represent this state
797+ type Item = PersistJob ;
798+
799+ fn next ( & mut self ) -> Option < Self :: Item > {
800+ let current_data = self . iter . next ( ) ?;
801+ let current_table_id = & current_data. table_id ;
802+
803+ let mut ts_min_max = current_data. timestamp_min_max ;
804+
805+ let mut all_batches = Vec :: with_capacity ( self . chunk_size ) ;
806+ let mut all_schemas = Vec :: with_capacity ( self . chunk_size ) ;
807+ all_batches. extend_from_slice ( & current_data. batch ) ;
808+ all_schemas. push ( current_data. schema . clone ( ) ) ;
809+
810+ let mut min_chunk_time = current_data. chunk_time ;
811+ // currently this naively assumes all batches are the same
812+ // shape, but they may not be - in that case we should use
813+ // the most recent table defn to add null arrays for batches
814+ // with missing cols.
815+ while all_batches. len ( ) < self . chunk_size {
816+ if let Some ( next_data) = self . iter . peek ( ) {
817+ if next_data. table_id == * current_table_id {
818+ let next = self . iter . next ( ) . unwrap ( ) ;
819+ ts_min_max = ts_min_max. union ( & next. timestamp_min_max ) ;
820+ min_chunk_time = min_chunk_time. min ( next. chunk_time ) ;
821+ all_batches. extend_from_slice ( & next. batch ) ;
822+ all_schemas. push ( next. schema . clone ( ) ) ;
823+ } else {
824+ break ;
825+ }
826+ } else {
827+ break ;
828+ }
829+ }
830+
831+ // most recent table defn
832+ let table_defn = self
833+ . catalog
834+ . db_schema_by_id ( & current_data. database_id ) ?
835+ . table_definition_by_id ( & current_data. table_id ) ?;
836+
837+ let expected_schema = table_defn. schema . clone ( ) ;
838+ let batches_with_schema_mismatch: Vec < ( usize , RecordBatch ) > = all_batches
839+ . iter ( )
840+ . cloned ( )
841+ . enumerate ( )
842+ // TODO: check if these are in order..
843+ . filter ( |( idx, _) | {
844+ let schema = & all_schemas[ * idx] ;
845+ for field_1 in expected_schema. iter ( ) {
846+ let mut found_field = false ;
847+ for field_2 in schema. iter ( ) {
848+ if field_1. 1 . name ( ) == field_2. 1 . name ( ) {
849+ found_field = true ;
850+ break ;
851+ }
852+ }
853+
854+ if !found_field {
855+ return true ;
856+ }
857+ }
858+ false
859+ } )
860+ . collect ( ) ;
861+
862+ if !batches_with_schema_mismatch. is_empty ( ) {
863+ // we need to add the missing fields - as schema changes are additive, when there is
864+ // a mismatch it means new column has been added to table but the batches are missing
865+ // them.
866+ for ( idx, batch) in & batches_with_schema_mismatch {
867+ let mut cols = vec ! [ ] ;
868+ let new_schema = & table_defn. schema ;
869+ // pick it's current iox schema, to add the columns (making null for missing)
870+ let outdated_batch_schema = & all_schemas[ * idx] ;
871+ debug ! (
872+ ?outdated_batch_schema,
873+ ">>> outdated batch schema when aligning mismatched schema"
874+ ) ;
875+ for col_idx_with_field_details in new_schema. iter ( ) . enumerate ( ) {
876+ let ( col_idx, ( influx_col_type, field) ) = col_idx_with_field_details;
877+ let batch_field = outdated_batch_schema. field_by_name ( field. name ( ) ) ;
878+ let len = batch. columns ( ) [ 0 ] . len ( ) ;
879+ if batch_field. is_some ( ) {
880+ let col = Arc :: clone ( & batch. columns ( ) [ col_idx] ) ;
881+ cols. push ( col) ;
882+ } else {
883+ let null_array_col = array_ref_nulls_for_type ( influx_col_type, len) ;
884+ cols. push ( null_array_col) ;
885+ }
886+ }
887+
888+ let new_arrow_schema = new_schema. as_arrow ( ) ;
889+ debug ! (
890+ ?new_arrow_schema,
891+ ">>> new arrow schema for batch when aligning mismatched schema"
892+ ) ;
893+ let new_rec_batch = RecordBatch :: try_new ( new_arrow_schema, cols) . expect (
894+ "record batch to be created with new schema after fixing schema mismatch" ,
895+ ) ;
896+
897+ let _ = std:: mem:: replace ( & mut all_batches[ * idx] , new_rec_batch) ;
898+ }
899+ }
900+
901+ Some ( PersistJob {
902+ database_id : current_data. database_id ,
903+ database_name : Arc :: clone ( & current_data. database_name ) ,
904+ table_id : current_data. table_id ,
905+ path : ParquetFilePath :: new (
906+ & self . host_prefix ,
907+ & current_data. database_name ,
908+ current_data. database_id . as_u32 ( ) ,
909+ & current_data. table_name ,
910+ current_data. table_id . as_u32 ( ) ,
911+ min_chunk_time,
912+ self . wal_file_number ,
913+ None ,
914+ ) ,
915+ table_name : Arc :: clone ( & current_data. table_name ) ,
916+ chunk_time : min_chunk_time,
917+ batch : all_batches,
918+ schema : current_data. schema . clone ( ) ,
919+ timestamp_min_max : ts_min_max,
920+ sort_key : current_data. sort_key . clone ( ) ,
921+ } )
922+ }
923+ }
924+
781925pub ( crate ) struct SortDedupePersistSummary {
782926 pub file_size_bytes : u64 ,
783927 pub file_meta_data : FileMetaData ,
@@ -799,7 +943,7 @@ async fn sort_dedupe_persist(
799943) -> Result < SortDedupePersistSummary , anyhow:: Error > {
800944 // Dedupe and sort using the COMPACT query built into
801945 // iox_query
802- let row_count = persist_job. batch . num_rows ( ) ;
946+ let row_count = persist_job. batch . iter ( ) . map ( |batch| batch . num_rows ( ) ) . sum ( ) ;
803947 info ! (
804948 "Persisting {} rows for db id {} and table id {} and chunk {} to file {}" ,
805949 row_count,
@@ -818,7 +962,7 @@ async fn sort_dedupe_persist(
818962 ) ;
819963
820964 let chunks: Vec < Arc < dyn QueryChunk > > = vec ! [ Arc :: new( BufferChunk {
821- batches: vec! [ persist_job. batch] ,
965+ batches: persist_job. batch,
822966 schema: persist_job. schema. clone( ) ,
823967 stats: Arc :: new( chunk_stats) ,
824968 partition_id: TransitionPartitionId :: from_parts(
@@ -904,7 +1048,7 @@ mod tests {
9041048 use parquet_file:: storage:: { ParquetStorage , StorageId } ;
9051049 use std:: num:: NonZeroUsize ;
9061050
907- #[ tokio:: test]
1051+ #[ test_log :: test ( tokio:: test) ]
9081052 async fn snapshot_works_with_not_all_columns_in_buffer ( ) {
9091053 let object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
9101054 let metrics = Arc :: new ( metric:: Registry :: default ( ) ) ;
0 commit comments