Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ public enum ErrorMsg {
@Deprecated // kept for backwards reference
REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with materialized view", true),
REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true),
VIEW_STORAGE_HANDLER_UNSUPPORTED(10448, "CREATE VIEW only supports STORED BY ICEBERG for native "
+ "Iceberg views; unsupported storage clause: {0}", true),
UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild could not be retrieved"),
MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE clauses " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.SQLViewRepresentation;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewRepresentation;
import org.apache.thrift.TException;

public class MetastoreUtil {
Expand Down Expand Up @@ -148,6 +154,76 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co
return result;
}

/**
* Builds a Hive metastore {@link Table} representation for an Iceberg {@link View}, for clients
* (e.g. {@code HiveRESTCatalogClient}) that bridge Iceberg catalog metadata into the HMS API.
*/
public static Table toHiveView(View view, Configuration conf) {
Table result = new Table();
TableName tableName =
TableName.fromString(
view.name(), MetaStoreUtils.getDefaultCatalog(conf), Warehouse.DEFAULT_DATABASE_NAME);
result.setCatName(tableName.getCat());
result.setDbName(tableName.getDb());
result.setTableName(tableName.getTable());
result.setTableType(TableType.VIRTUAL_VIEW.toString());

ViewMetadata metadata = ((BaseView) view).operations().current();
String sqlText = viewSqlText(view, metadata);
result.setViewOriginalText(sqlText);
result.setViewExpandedText(sqlText);

long nowMillis = System.currentTimeMillis();
int nowSec = (int) (nowMillis / 1000);
String owner =
PropertyUtil.propertyAsString(
metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, System.getProperty("user.name"));
result.setOwner(owner);
result.setCreateTime(nowSec);
result.setLastAccessTime(nowSec);
result.setRetention(Integer.MAX_VALUE);

boolean hiveEngineEnabled = false;
result.setSd(HiveOperationsBase.storageDescriptor(metadata.schema(), metadata.location(), hiveEngineEnabled));

long maxHiveTablePropertySize =
conf.getLong(
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
HMSTablePropertyHelper.updateHmsTableForIcebergView(
metadata.metadataFileLocation(),
result,
metadata,
Collections.emptySet(),
maxHiveTablePropertySize,
null);

String catalogType = IcebergCatalogProperties.getCatalogType(conf);
if (!StringUtils.isEmpty(catalogType) && !IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) {
result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, IcebergCatalogProperties.getCatalogType(conf));
}
return result;
}

private static String viewSqlText(View view, ViewMetadata metadata) {
SQLViewRepresentation hiveRepr = view.sqlFor("hive");
if (hiveRepr != null) {
return hiveRepr.sql();
}
SQLViewRepresentation closest = null;
for (ViewRepresentation representation : metadata.currentVersion().representations()) {
if (representation instanceof SQLViewRepresentation) {
SQLViewRepresentation sqlViewRepresentation = (SQLViewRepresentation) representation;
if (sqlViewRepresentation.dialect().equalsIgnoreCase("hive")) {
return sqlViewRepresentation.sql();
} else if (closest == null) {
closest = sqlViewRepresentation;
}
}
}
return closest == null ? "" : closest.sql();
}

private static StorageDescriptor getHiveStorageDescriptor(org.apache.iceberg.Table table) {
var result = new StorageDescriptor();
result.setCols(HiveSchemaUtil.convert(table.schema()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.view.ViewBuilder;

/**
* Commits a native Iceberg view through the configured default Iceberg catalog (HiveCatalog or REST
* catalog, etc.) when {@code Catalog} also implements {@link ViewCatalog}.
*/
public final class NativeIcebergViewSupport {

/** HMS parameter aligned with Hive's {@code CreateViewDesc#ICEBERG_NATIVE_VIEW_PROPERTY}. */
public static final String ICEBERG_NATIVE_VIEW_PROPERTY = "hive.iceberg.native.view";

private NativeIcebergViewSupport() {
}

/**
* Creates or replaces a view in the Iceberg catalog.
*
* @return {@code false} if skipped because {@code ifNotExists} is true and the view already exists
*/
public static boolean createOrReplaceNativeView(Configuration conf, String databaseName, String viewName,
List<FieldSchema> fieldSchemas, String viewSql, Map<String, String> tblProperties, String comment,
boolean replace, boolean ifNotExists) throws Exception {

TableIdentifier identifier = TableIdentifier.of(databaseName, viewName);
String catalogName = IcebergCatalogProperties.getCatalogName(conf);
Map<String, String> catalogProps = IcebergCatalogProperties.getCatalogProperties(conf, catalogName);
Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, conf);
try {
ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName);
if (!replace && ifNotExists && viewCatalog.viewExists(identifier)) {
return false;
}

ViewBuilder builder = startViewBuilder(viewCatalog, identifier, fieldSchemas, viewSql);
builder = applyCommentAndTblProps(builder, tblProperties, comment);
commitView(builder, replace);
return true;
} finally {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
}
}

private static ViewCatalog asViewCatalog(Catalog catalog, String catalogName) {
if (!(catalog instanceof ViewCatalog)) {
throw new UnsupportedOperationException(
String.format(
"Iceberg catalog '%s' does not implement ViewCatalog.",
catalogName) +
" Native views require a catalog that implements ViewCatalog (e.g. HiveCatalog or REST).");
}
return (ViewCatalog) catalog;
}

private static ViewBuilder startViewBuilder(
ViewCatalog viewCatalog,
TableIdentifier identifier,
List<FieldSchema> fieldSchemas,
String viewSql) {
return viewCatalog
.buildView(identifier)
.withSchema(HiveSchemaUtil.convert(fieldSchemas, Collections.emptyMap(), true))
.withDefaultNamespace(Namespace.of(identifier.namespace().level(0)))
.withQuery("hive", viewSql)
.withProperty(ICEBERG_NATIVE_VIEW_PROPERTY, "true");
}

private static ViewBuilder applyCommentAndTblProps(
ViewBuilder builder, Map<String, String> tblProperties, String comment) {
ViewBuilder viewBuilder = builder;
if (comment != null && !comment.isEmpty()) {
viewBuilder = viewBuilder.withProperty("comment", comment);
}
if (tblProperties != null) {
for (Map.Entry<String, String> e : tblProperties.entrySet()) {
if (e.getKey() != null && e.getValue() != null) {
viewBuilder = viewBuilder.withProperty(e.getKey(), e.getValue());
}
}
}
return viewBuilder;
}

private static void commitView(ViewBuilder builder, boolean replace) {
if (replace) {
builder.createOrReplace();
} else {
builder.create();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
Expand All @@ -43,7 +45,9 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.hive.HMSTablePropertyHelper;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.IcebergCatalogProperties;
Expand All @@ -54,6 +58,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.view.View;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,10 +128,19 @@ public List<String> getTables(String catName, String dbName, String tablePattern
Pattern pattern = Pattern.compile(regex);

// List tables from the specific database (namespace) and filter them.
return restCatalog.listTables(Namespace.of(dbName)).stream()
Set<String> names = new LinkedHashSet<>();
restCatalog.listTables(Namespace.of(dbName)).stream()
.map(TableIdentifier::name)
.filter(pattern.asPredicate())
.toList();
.forEach(names::add);
if (restCatalog instanceof ViewCatalog) {
((ViewCatalog) restCatalog)
.listViews(Namespace.of(dbName)).stream()
.map(TableIdentifier::name)
.filter(pattern.asPredicate())
.forEach(names::add);
}
return Lists.newArrayList(names);
}

@Override
Expand All @@ -136,7 +150,12 @@ public List<String> getAllTables(String catName, String dbName) {

@Override
public void dropTable(Table table, boolean deleteData, boolean ignoreUnknownTab, boolean ifPurge) throws TException {
restCatalog.dropTable(TableIdentifier.of(table.getDbName(), table.getTableName()));
TableIdentifier id = TableIdentifier.of(table.getDbName(), table.getTableName());
if (restCatalog instanceof ViewCatalog && ((ViewCatalog) restCatalog).viewExists(id)) {
((ViewCatalog) restCatalog).dropView(id);
} else {
restCatalog.dropTable(id);
}
}

private void validateCurrentCatalog(String catName) {
Expand All @@ -149,7 +168,11 @@ private void validateCurrentCatalog(String catName) {
@Override
public boolean tableExists(String catName, String dbName, String tableName) {
validateCurrentCatalog(catName);
return restCatalog.tableExists(TableIdentifier.of(dbName, tableName));
TableIdentifier id = TableIdentifier.of(dbName, tableName);
if (restCatalog.tableExists(id)) {
return true;
}
return restCatalog instanceof ViewCatalog && ((ViewCatalog) restCatalog).viewExists(id);
}

@Override
Expand Down Expand Up @@ -178,14 +201,22 @@ public Database getDatabase(String catName, String dbName) throws NoSuchObjectEx
@Override
public Table getTable(GetTableRequest tableRequest) throws TException {
validateCurrentCatalog(tableRequest.getCatName());
org.apache.iceberg.Table icebergTable;
TableIdentifier id =
TableIdentifier.of(tableRequest.getDbName(), tableRequest.getTblName());
try {
icebergTable = restCatalog.loadTable(TableIdentifier.of(tableRequest.getDbName(),
tableRequest.getTblName()));
} catch (NoSuchTableException exception) {
org.apache.iceberg.Table icebergTable = restCatalog.loadTable(id);
return MetastoreUtil.toHiveTable(icebergTable, conf);
} catch (NoSuchTableException tableMissing) {
if (restCatalog instanceof ViewCatalog) {
try {
View icebergView = ((ViewCatalog) restCatalog).loadView(id);
return MetastoreUtil.toHiveView(icebergView, conf);
} catch (NoSuchViewException viewMissing) {
throw new NoSuchObjectException();
}
}
throw new NoSuchObjectException();
}
return MetastoreUtil.toHiveTable(icebergTable, conf);
}

@Override
Expand Down
Loading
Loading