Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ParquetMetadataCache.h>
#include <Processors/Sources/ConstChunkGenerator.h>
Expand Down Expand Up @@ -770,6 +772,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
if (!column.second.second.type->isNullable())
continue;

/// With View over Iceberg table we have someting like 'materialize(time)' as column_name
ParserExpression parser;
ASTPtr expr = parseQuery(parser, column_name, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
if (expr->as<ASTFunction>())
continue;

/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,10 @@ def check_events(query_id, event, is_cluster, expected):
GROUP BY ALL
FORMAT CSV
""")
# Weird, bu looks like ReadFileMetadata does not used local file cache in 26.1
# metadata.json always downloaded in 26.1, once per query or subquery
# In 25.8 count was equal to expected, in 26.1 it is expected * 3 + 1 for Local case
# expected * 3 + 4 for Cluster case, because each subquery loads mettadata.json
assert int(res) == expected * 3 + (4 if is_cluster else 1)
assert int(res) == expected

event = "S3GetObject" if storage_type == "s3" else "AzureGetObject"
# Each file contains one row group, so number of reded row groups is equal to readed data files
event = "ParquetReadRowGroups"

# Without optimization clickhouse reads all 7 files
check_events(all_data_expected_query_id, event, run_on_cluster, 7)
Expand All @@ -212,4 +209,69 @@ def compare_selects(query):

compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL")
compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL")
compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL")
compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL")


@pytest.mark.parametrize("storage_type", ["s3", "azure"])
@pytest.mark.parametrize("run_on_cluster", [False, True])
def test_read_constant_columns_optimization_view(started_cluster_iceberg_with_spark, storage_type, run_on_cluster):
instance = started_cluster_iceberg_with_spark.instances["node1"]
spark = started_cluster_iceberg_with_spark.spark_session
TABLE_NAME = "test_read_constant_columns_optimization_view_" + storage_type + "_" + get_uuid_str()

def execute_spark_query(query: str):
return execute_spark_query_general(
spark,
started_cluster_iceberg_with_spark,
storage_type,
TABLE_NAME,
query,
)

execute_spark_query(
f"""
CREATE TABLE {TABLE_NAME} (
tag INT,
date DATE,
date2 DATE,
name VARCHAR(50),
number BIGINT
)
USING iceberg
PARTITIONED BY (identity(tag), years(date))
OPTIONS('format-version'='2')
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(1, DATE '2024-01-20', DATE '2024-01-20', 'vasya', 5),
(1, DATE '2024-01-20', DATE '2024-01-20', 'vasilisa', 5),
(1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5),
(1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5),
(2, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 5),
(2, DATE '2025-01-21', DATE '2025-01-20', 'vasilisa', 5)
"""
)

creation_expression = get_creation_expression(
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=run_on_cluster
)

# Check that view over Iceberg table works
instance.query(f"CREATE VIEW {TABLE_NAME}_view AS SELECT * FROM {creation_expression}")

expected = instance.query(f"SELECT * FROM {TABLE_NAME}_view ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0")
# All data
optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1")
assert expected == optimized
# Constant column in where
optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view WHERE number=5 ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1")
assert expected == optimized
# Partition columns in where
optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view WHERE tag>0 AND date>'2020-01-01' ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1")
assert expected == optimized
# Non-constant column in where
optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view WHERE date2!='2020-01-01' ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1")
assert expected == optimized
Loading