Giter Site home page Giter Site logo

netease / amoro Goto Github PK

View Code? Open in Web Editor NEW
676.0 36.0 237.0 63.78 MB

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.

Home Page: https://amoro.apache.org/

License: Apache License 2.0

Java 89.46% Thrift 0.17% JavaScript 0.34% HTML 0.01% Vue 2.36% TypeScript 0.97% Less 0.12% ANTLR 0.78% Scala 5.23% Dockerfile 0.12% Python 0.10% Shell 0.23% Smarty 0.10%
bigdata datalake lakehouse

amoro's Introduction

Amoro logo

Amoro(former name was Arctic) is a Lakehouse management system built on open data lake formats. Working with compute engines including Flink, Spark, and Trino, Amoro brings pluggable and self-managed features for Lakehouse to provide out-of-the-box data warehouse experience, and helps data platforms or products easily build infra-decoupled, stream-and-batch-fused and lake-native architecture.

Architecture

Here is the architecture diagram of Amoro:

Amoro architecture

  • AMS: Amoro Management Service provides Lakehouse management features, like self-optimizing, data expiration, etc. It also provides a unified catalog service for all compute engines, which can also be combined with existing metadata services.
  • Plugins: Amoro provides a wide selection of external plugins to meet different scenarios.
    • Optimizers: The self-optimizing execution engine plugin asynchronously performs merging, sorting, deduplication, layout optimization, and other operations on all type table format tables.
    • Terminal: SQL command-line tools, provide various implementations like local Spark and Kyuubi.
    • LogStore: Provide millisecond to second level SLAs for real-time data processing based on message queues like Kafka and Pulsar.

Supported table formats

Amoro can manage tables of different table formats, similar to how MySQL/ClickHouse can choose different storage engines. Amoro meets diverse user needs by using different table formats. Currently, Amoro supports four table formats:

  • Iceberg format: Users can directly entrust their Iceberg tables to Amoro for maintenance, so that users can not only use all the functions of Iceberg tables, but also enjoy the performance and stability improvements brought by Amoro.
  • Mixed-Iceberg format: Amoro provides a set of more optimized formats for streaming update scenarios on top of the Iceberg format. If users have high performance requirements for streaming updates or have demands for CDC incremental data reading functions, they can choose to use the Mixed-Iceberg format.
  • Mixed-Hive format: Many users do not want to affect the business originally built on Hive while using data lakes. Therefore, Amoro provides the Mixed-Hive format, which can upgrade Hive tables to Mixed-Hive format only through metadata migration, and the original Hive tables can still be used normally. This ensures business stability and benefits from the advantages of data lake computing.
  • Paimon format: Amoro supports displaying metadata information in the Paimon format, including Schema, Options, Files, Snapshots, DDLs, and Compaction information.

Supported engines

Iceberg format

Iceberg format tables use the engine integration method provided by the Iceberg community. For details, please refer to: Iceberg Docs.

Mixed format

Amoro support multiple processing engines for Mixed format as below:

Processing Engine Version Batch Read Batch Write Batch Overwrite Streaming Read Streaming Write Create Table Alter Table
Flink 1.15.x, 1.16.x, 1.17.x
Spark 3.1, 3.2, 3.3
Hive 2.x, 3.x
Trino 406

Features

  • Self-optimizing - Continuously optimizing tables, including compacting small files, change files, regularly delete expired files to keep high query performance and reducing storage costs.
  • Multiple Formats - Support different table formats such as Iceberg, Mixed-Iceberg and Mixed-Hive to meet different scenario requirements and provide them with unified management capabilities.
  • Catalog Service - Provide a unified catalog service for all compute engines, which can also used with existing metadata store service such as Hive Metastore and AWS Glue.
  • Rich Plugins - Provide various plugins to integrate with other systems, like continuously optimizing with Flink and data analysis with Spark and Kyuubi.
  • Management Tools - Provide a variety of management tools, including WEB UI and standard SQL command line, to help you get started faster and integrate with other systems more easily.
  • Infrastructure Independent - Can be easily deployed and used in private environments, cloud environments, hybrid cloud environments, and multi-cloud environments.

Modules

Amoro contains modules as below:

  • amoro-core contains core abstractions and common implementation for other modules
  • amoro-ams is amoro management service module
    • ams-api contains ams thrift api and common interfaces
    • ams-dashboard is the dashboard frontend for ams
    • ams-server is the backend server for ams
    • ams-optimizer provides default optimizer implementation
  • amoro-mixed-format provides Mixed format implementation
    • amoro-hive integrates with Apache Hive and implements Mixed Hive format
    • amoro-flink provides Flink connectors for Mixed format tables (use amoro-flink-runtime for a shaded version)
    • amoro-spark provides Spark connectors for Mixed format tables (use amoro-spark-runtime for a shaded version)
    • amoro-trino provides Trino connectors for Mixed format tables

Building

Amoro is built using Maven with Java 1.8 and Java 17(only for mixed-format/trino module).

  • To build Trino module need config toolchains.xml in ${user.home}/.m2/ dir, the content is
<?xml version="1.0" encoding="UTF-8"?>
<toolchains>
    <toolchain>
        <type>jdk</type>
        <provides>
            <version>17</version>
            <vendor>sun</vendor>
        </provides>
        <configuration>
            <jdkHome>${YourJDK17Home}</jdkHome>
        </configuration>
    </toolchain>
</toolchains>
  • To invoke a build and run tests: mvn package -P toolchain
  • To skip tests: mvn -DskipTests package -P toolchain
  • To package without trino module and JAVA 17 dependency: mvn clean package -DskipTests -pl '!mixed-format/trino'
  • To build with hadoop 2.x(the default is 3.x) mvn clean package -DskipTests -Dhadoop=v2
  • To indicate Flink version for optimizer (the default is 1.18.1): mvn clean package -Dflink-optimizer.flink-version=1.15.4. If the version of Flink is below 1.15.0, you also need to add the -Pflink-pre-1.15 parameter: mvn clean package -Pflink-pre-1.15 -Dflink-optimizer.flink-version=1.14.6. mvn clean package -Pflink-pre-1.15 -Dflink-optimizer.flink-version=1.14.6 -DskipTests

Spotless is skipped by default in trino module. So if you want to perform checkstyle when building trino module, you must be in a Java 17 environment.

  • To invoke a build include mixed-format/trino module in Java 17 environment: mvn clean package -DskipTests -P trino-spotless
  • To only build mixed-format/trino and its dependent modules in Java 17 environment: mvn clean package -DskipTests -P trino-spotless -pl 'mixed-format/trino' -am

Quickstart

Visit https://amoro.apache.org/quick-demo/ to quickly explore what amoro can do.

Join Community

If you are interested in Lakehouse, Data Lake Format, welcome to join our community, we welcome any organizations, teams and individuals to grow together, and sincerely hope to help users better use Data Lake Format through open source.

Join the Amoro WeChat Group: Add " kllnn999 " as a friend on WeChat and specify "Amoro lover".

Contributors

This project exists thanks to all the people who contribute.

Made with contrib.rocks.

Star History

Star History Chart

amoro's People

Contributors

aireed avatar baiyangtx avatar czy006 avatar gavinh1984 avatar hameizi avatar hellojinsilei avatar huangfru avatar huiyuanz avatar huyuanfeng2018 avatar hzluting avatar klion26 avatar link3280 avatar lklhdu avatar majin1102 avatar minteliuwm avatar nicochen avatar rfyu avatar shendanfengg avatar shidayang avatar stenicholas avatar tcodehuber avatar wangtaohz avatar xbaith avatar xieyi888 avatar xujiangfeng001 avatar yesorno828 avatar zhangmo8 avatar zhongqishang avatar zhoujinsong avatar zstraw avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amoro's Issues

[prepare for conference]ui optimization

  1. The top of the tables list is aligned to the top of the page, and the row height of the cluster is the same as the row height of the top column on the right side of the page
  2. The name of the optimizing tab is changed to optimizers, and the order of the optimizers tab and tables tab in the page is exchanged
  3. The column width of the table name in the optimizers-tables list is widened
  4. Adjust the tab column position and tab spacing in the tables page and optimizers page appropriately

[Flink] [bug] Logstore serialize ArrayIndexOutOfBoundsException with Localtimestamp system build-in function

insert into arctic.db.table /*+ OPTIONS('arctic.emit.mode'='log') select id,name, LOCALTIMESTMAP from source;

arctic.db.table table info:

Flink SQL> show create table log_table;
CREATE TABLE `arctic`.`db`.`log_table` (
  `id` INT NOT NULL,
  `name` VARCHAR(2147483647),
  `ts` TIMESTAMP(3),
  CONSTRAINT `41b0c2e1-dda2-468b-ad39-ccee3af21aec` PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'log-store.enable' = 'true',
  'log-store.data-format' = 'json',
  'table.create-timestamp' = '1659603126358',
  'format-version' = '2',
  'format' = 'json',
  ......

[spark 2.3] support create arctic datasource table

support create table grammer && create table as select grammer

support specific primary key.

create table xxx (
  cols .... 
  primary key ( cols.... )
) using arctic 

note :

  • create arctic table using same table identifier with spark table.
  • reuse CreateTable logicalPlan && CreateDataSourceTableCommand

[Flink] Support create and get timestamp(3) timestamp(0) type in the arctic table

Scenario:
Create an iceberg table that has a timestamp-type column with a precision equal to 0 by Flink SQL.

Using the below DDL to create a watermark definition table like the iceberg table would be failed.
create table hadoop_catalog.db.dim (id int, name string, opt timestamp(0), primary key (id) not enforced) ;

create table arctic_dim ( watermark for opt as opt - interval '5' second) like hadoop_catalog.db.dim;

org.apache.flink.table.api.ValidationException: Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is TIMESTAMP(6)

The TypeToFlinkType default converts it to flink timestamp type with precision 6.

[spark3] support data frame api for keyed table

support v1 dataframe api

df.format("arctic").save("xxxx")
df.format("arctic").mode("overwrite").save("xxx")

val df = spark.read.format("arctic").load("xxx")

suport v2 dataframe api

df.writeTo("xxx")
df.writeTo("xxx").overwritePartitions()

val df = spark.read.table("xxx")

[Bug][AMS]unit test failed on Windows system

run command via git bash

mvn install -pl '!trino' 

unit test will failed

Tests run: 2, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 7.277 sec <<< FAILURE!
orphanMetadataFileClean(com.netease.arctic.ams.server.optimize.TestOrphanFileClean)  Time elapsed: 3.959 sec  <<< FAILURE!
java.lang.AssertionError
        at org.junit.Assert.fail(Assert.java:87)
        at org.junit.Assert.assertTrue(Assert.java:42)
        at org.junit.Assert.assertFalse(Assert.java:65)
        at org.junit.Assert.assertFalse(Assert.java:75)
        at com.netease.arctic.ams.server.optimize.TestOrphanFileClean.orphanMetadataFileClean(TestOrphanFileClean.java:93)


....

Results :

Failed tests:   orphanMetadataFileClean(com.netease.arctic.ams.server.optimize.TestOrphanFileClean)
  orphanDataFileClean(com.netease.arctic.ams.server.optimize.TestOrphanFileClean)

Tests in error:
  testReleaseOptimizer(com.netease.arctic.ams.server.controller.OptimizerControllerTest)
(..)stGetTableList(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetCatalogs(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetDatabaseList(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetOptimizeInfo(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetTableTransactions(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetTransactionDetail(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetTablePartitions(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetPartitionFileListInfo(com.netease.arctic.ams.server.controller.TableControllerTest):
(..)stGetTableDetail(com.netease.arctic.ams.server.controller.TableControllerTest):

Tests run: 35, Failures: 2, Errors: 10, Skipped: 0


[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for arctic-parent 0.3.0:
[INFO]
[INFO] arctic-parent ...................................... SUCCESS [  1.843 s]
[INFO] arctic-ams ......................................... SUCCESS [  0.220 s]
[INFO] arctic-ams-api ..................................... SUCCESS [  1.438 s]
[INFO] arctic-core ........................................ SUCCESS [ 20.968 s]
[INFO] arctic-optimizer ................................... SUCCESS [ 28.756 s]
[INFO] arctic-spark ....................................... SUCCESS [  0.233 s]
[INFO] arctic-spark_3.1 ................................... SUCCESS [01:18 min]
[INFO] arctic-ams-dashboard ............................... SUCCESS [  0.818 s]
[INFO] arctic-ams-server .................................. FAILURE [01:27 min]
[INFO] arctic-flink ....................................... SKIPPED
[INFO] arctic-flink-format-1.12 ........................... SKIPPED
[INFO] arctic-flink-1.12 .................................. SKIPPED
[INFO] arctic-flink-runtime-1.12 .......................... SKIPPED
[INFO] arctic-spark_2.3 ................................... SKIPPED
[INFO] arctic-spark_3.1-runtime ........................... SKIPPED
[INFO] dist ............................................... SKIPPED
[INFO] arctic-hive ........................................ SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  03:40 min
[INFO] Finished at: 2022-08-04T10:16:55+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12.4:test (default-test) on project arctic-ams-server: There are test failures.

[Feature][Flink] Introducing the INSERT OVERWRITE statement for mixed-streaming format tables.

Search before asking

  • I have searched in the issues and found no similar issues.

What would you like to be improved?

Currently, the insert overwrite statement is supported for mixed-streaming format tables without primary key specification. In order to meet the batch processing capability of the Flink engine on keyed tables.

Mixed-streaming format tables should include mixed-iceberg and mixed-hive format tables.

INSERT OVERWRITE [catalog_name.][db_name.]table_name [column_list] select_statement

column_list:
  (col_name1 [, column_name2, ...])

OVERWRITE

INSERT OVERWRITE will overwrite any existing data in the table or partition. Otherwise, new data is appended.

COLUMN LIST

Given a table T(a INT, b INT, c INT), Flink supports INSERT INTO T(c, b) SELECT x, y FROM S. The expectation is that ‘x’ is written to column ‘c’ and ‘y’ is written to column ‘b’ and ‘a’ is set to NULL (assuming column ‘a’ is nullable).

How should we improve?

Flink API should implement the interface: SupportsOverwrite;

This feature only works in flink batch runtime mode.

Affected Flink versions: flink1.12/flink1.14/flink1.15.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Subtasks

No response

Code of Conduct

[Flink] Arctic table supports real-time dimension table join

Background:
There are some requirements for real-time data widening. Now hive supports lookup join, but this solution is not available for production, and the hive table needs to be loaded into memory. Large tables are prone to oom problems. Besides, neither Iceberg nor Hudi support lookup joins.

Here is a summary proposal:
Flink affords the event time temporal join. The right table will be used as a version table, and its data can be managed in rocksdb instead of memory.

-- create a left table, using localtimestamp as event time.
create table source (
  ...,
  arcitc_process_time AS LOCALTIMESTAMP,
  WATERMARK FOR arcitc_process_time AS arcitc_process_time,
) with (...);

create table arctic_dim (...) with ('connector'='arctic', 'dim-table.enabled'='true');

select * from source as O left join arctic_dim FOR SYSTEM_TIME AS OF O.arcitc_process_time as P on O.id = P.id;

The arctic source will automatically create a custom watermark strategy if dim-table.enabled equals true.

[spark3] support unkeyed table (adapt hive catalog)

support insert into/ insert overwrite for unkeyed table adapt hive catalog

unkeyed table under hive cannot reuse iceberg SparkTable code. make ArcticSparkTable warpped HiveUnkeyedTable, support reader/writer for table

[Improvement][Flink] Adding support for setting/altering Arctic table properties

Search before asking

What would you like to be improved?

Enhancing the ability of Flink SQL to operate Arctic tables, allowing users to modify table properties through the alter table syntax.
Set or Alter Table Properties

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.

How should we improve?

  • Implement the ArcticCatalog.alterTable method.
  • Invoking the AMS API to alter the properties.
  • Add some UTs with the Flink Catalog API and Flink SQL.
  • Affected Flink versions: flink1.12/flink1.14/flink1.15.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Subtasks

Code of Conduct

[spark2.3] support arctic table scan and mor

add spark conf

conf.set("arctic.catalog.url", "thrift://xxxx/catalog_name")
conf.set("spark,sqk,arctic.delegate.enable", "true" )
conf.set("spark.sql.extensions" , "com.netease.arctic.spark.ArcticSparkSessionExtensions")

note: new spark conf

  • arctic.catalog.url : default arctic catalog url for lookup table meta.
  • spark,sqk,arctic.delegate.enable : set to false, will disable arctic deleagete, this is useful when deployed arctic with many exists hive table.

support merge on read for keyed table

[spark3] create table as select support primary key

support grammar

create table db.sample (
    xxx ....  // columns 
   primary key( .... )
) using arctic  as select 

reuse CreateTableAsSelectStatement for adapt other extensions like ranger.

note:

  • create table as select action write date to keyed table base store.

[spark2.3] support arctic dataframe api

df.format("arctic").save("xxxx")
df.format("arctic").mode("overwrite").save("xxx")

val df = spark.read.format("arctic").load("xxx")

keep same action with spark3

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.