当前位置:网站首页>Arrow parquet 之 String Reader
Arrow parquet 之 String Reader
2022-08-09 15:46:00 【zhixingheyi_tian】
Switch
cpp/src/parquet/column_reader.cc
TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool)
: BASE(descr, pool) {
leaf_info_ = leaf_info;
nullable_values_ = leaf_info.HasNullableValues();
at_record_start_ = true;
records_read_ = 0;
values_written_ = 0;
values_capacity_ = 0;
null_count_ = 0;
levels_written_ = 0;
levels_position_ = 0;
levels_capacity_ = 0;
uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
if (uses_values_) {
values_ = AllocateBuffer(pool);
}
valid_bits_ = AllocateBuffer(pool);
def_levels_ = AllocateBuffer(pool);
rep_levels_ = AllocateBuffer(pool);
Reset();
}
NextBatch 顺序
以下逻辑均在 cpp/src/parquet/arrow/reader.cc
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
[readers, batch_schema, num_rows,
this]() mutable -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(readers.size());
// don't reserve more rows than necessary
int64_t batch_size = std::min(properties().batch_size(), num_rows);
num_rows -= batch_size;
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()),
[&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
for (const auto& column : columns) {
if (column == nullptr || column->length() == 0) {
return ::arrow::IterationTraits<RecordBatchIterator>::End();
}
}
auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
// NB: explicitly preserve table so that table_reader doesn't outlive it
return ::arrow::MakeFunctionIterator(
[table, table_reader] { return table_reader->Next(); });
});
先通过 metadata 算出了 num_rows
int64_t num_rows = 0;
for (int row_group : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
}
然后计算 真实的batch_size
// don't reserve more rows than necessary
int64_t batch_size = std::min(properties().batch_size(), num_rows);
num_rows -= batch_size;
::arrow::Status NextBatch(int64_t batch_size,
std::shared_ptr<::arrow::ChunkedArray>* out) final {
RETURN_NOT_OK(LoadBatch(batch_size));
RETURN_NOT_OK(BuildArray(batch_size, out));
for (int x = 0; x < (*out)->num_chunks(); x++) {
RETURN_NOT_OK((*out)->chunk(x)->Validate());
}
return Status::OK();
}
NextRowGroup and TransferColumnData 位置
Status LoadBatch(int64_t records_to_read) final {
BEGIN_PARQUET_CATCH_EXCEPTIONS
out_ = nullptr;
record_reader_->Reset();
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
}
int64_t records_read = record_reader_->ReadRecords(records_to_read);
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
}
}
RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_,
ctx_->pool, &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
边栏推荐
猜你喜欢
数据可视化的类别及其重要性
B40 - 基于STM32单片机的电热蚊香蓝牙控制系统
MySQL 5.5系列安装步骤教程(图解版)
一个程序员的水平能差到什么程度?
自定义过滤器和拦截器实现ThreadLocal线程封闭
图像几何校正
网络——IPV4地址(二)
Leading practice | How the world's largest wine app uses design sprint to innovate the vivino model
B44 - Based on stm32 bluetooth intelligent voice recognition classification broadcast trash
网络——数字数据编码
随机推荐
Reasons for slow startup of IDEA (1)
AVL树的插入操作
ESP8266-Arduino编程实例-MQ-5液化天然气传感器驱动
PHP completes missing dates in date ranges/returns missing dates
打印星型图「建议收藏」
Leetcode——3.无重复字符的最长字串
NFT+IDO预售代币合约模式系统开发
初识C语言(1)
[1413. Stepwise summation to get the minimum value of positive numbers]
ESP8266-Arduino编程实例-MQ-4气体传感器驱动
国星光电吉利产业园项目主体结构全面封顶,将重点生产 RGB 小间距、Mini LED、TOP LED 等产品
SQL抖音面试题:送你一个万能模板,要吗?(重点、每个用户每月连续登录的最大天数)
想通这点,治好 AI 打工人的精神内耗
Apple Developer Account Apply for D-U-N-S Number
uniapp 项目搭建
A49 - ESP8266建立AP传输XPT2046AD数据WIFI模块
网络——IPV4地址(二)
网络——介质访问控制
IDEA启动缓慢原因(一)
B019 - 甲醛甲烷煤气温湿度时间测试仪