Обмен технологиями

Apache Doris Руководство пользователя по быстрой установке Apache Hudi |

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

автор:SelectDB Техническая команда

Введение: Data Lakehouse сочетает в себе высокую производительность и производительность хранилища данных в режиме реального времени с низкой стоимостью и гибкостью озера данных, что помогает пользователям более удобно удовлетворять различные потребности в обработке и анализе данных. В предыдущих нескольких версиях Apache Doris продолжал углублять свою интеграцию с озером данных и превратился в зрелое интегрированное решение для озера и хранилища. Чтобы помочь пользователям быстро приступить к работе, мы представим руководство по построению интегрированной архитектуры озер и хранилищ для Apache Doris и различных основных форматов озер данных и систем хранения в серии статей, включая Hudi, Iceberg, Paimon, OSS, Delta Lake. , Kudu, BigQuery и т. д. Добро пожаловать, следите за обновлениями.

Являясь новой открытой архитектурой управления данными, Data Lakehouse сочетает в себе высокую производительность и производительность хранилищ данных в режиме реального времени с низкой стоимостью и гибкостью озер данных, помогая пользователям более удобно удовлетворять различные потребности в данных. Спрос на анализ обработки данных используется все чаще. в корпоративных системах больших данных.

За последние несколько версийАпач Дорис Оно продолжает углублять интеграцию с озером данных и теперь превратилось в зрелое интегрированное решение для озера и хранилища.

  • Начиная с версии 0.15, Apache Doris представила внешние таблицы Hive и Iceberg, пытаясь изучить возможность объединения с озером данных поверх Apache Iceberg.
  • Начиная с версии 1.2, Apache Doris официально представила функцию мультикаталога, которая реализовала автоматическое сопоставление метаданных и доступ к данным из нескольких источников данных, а также выполнила множество оптимизаций производительности при чтении внешних данных, выполнении запросов и т. д. и полностью работоспособна. построения Чрезвычайно быстро и легко использовать возможности архитектуры Lakehouse.
  • В версии 2.1 интегрированная архитектура хранилища озер Apache Doris была всесторонне усилена, что не только расширяет возможности чтения и записи основных форматов озера данных (Hudi, Iceberg, Paimon и т. д.), но также обеспечивает совместимость с несколькими диалектами SQL и возможность конвертации из оригинала. Система плавно переключается на Apache Doris. В области анализа данных и крупномасштабного чтения данных Дорис интегрирует интерфейс высокоскоростного чтения Arrow Flight, который повышает эффективность передачи данных в 100 раз.

Озеро и склад Апач Дорис в одном.png

Апачи Дорис + Апачи Худи

Апачи Худи В настоящее время это один из наиболее распространенных форматов открытых озер данных и платформа управления озером транзакционных данных, которая поддерживает множество основных механизмов запросов, включая Apache Doris.Апач Дорис Также были улучшены возможности чтения таблиц данных Apache Hudi:

  • Копировать при записи таблицы: Запрос моментального снимка
  • Объединение при чтении таблицы: запросы моментальных снимков, оптимизированные запросы чтения
  • Поддержка путешествий во времени
  • Поддержка добавочного чтения

Благодаря высокопроизводительному выполнению запросов Apache Doris и возможностям управления данными в реальном времени Apache Hudi можно обеспечить эффективный, гибкий и недорогой запрос и анализ данных. Он также обеспечивает мощный обратный поиск данных, аудит и инкрементную обработку. Функции в настоящее время основаны на Apache. Комбинация Doris и Apache Hudi была проверена и продвинута в реальных бизнес-сценариях несколькими пользователями сообщества:

  • Анализ и обработка данных в режиме реального времени : Общие сценарии, такие как анализ транзакций в финансовой отрасли, анализ потока кликов в реальном времени в рекламной индустрии и анализ поведения пользователей в индустрии электронной коммерции, требуют обновления данных в реальном времени и анализа запросов. Hudi может осуществлять обновление и управление данными в режиме реального времени, а также обеспечивать согласованность и надежность данных. Doris может эффективно обрабатывать крупномасштабные запросы к данным в режиме реального времени. Сочетание этих двух функций может полностью удовлетворить потребности в анализе и обработке данных в реальном времени. .

  • Отслеживание и аудит данных : Для таких отраслей, как финансы и здравоохранение, где предъявляются чрезвычайно высокие требования к безопасности и точности данных, возврат данных и аудит являются очень важными функциями. Hudi предоставляет функцию перемещения во времени, которая позволяет пользователям просматривать состояние исторических данных. В сочетании с эффективными возможностями запросов Apache Doris он может быстро искать и анализировать данные в любой момент времени для достижения точного обратного отслеживания и аудита.

  • Инкрементное чтение и анализ данных: При проведении анализа больших данных мы часто сталкиваемся с проблемами большого масштаба данных и частых обновлений. Hudi поддерживает инкрементальное чтение данных, что позволяет пользователям обрабатывать только изменяющиеся данные без необходимости одновременного обновления всего объема данных; Функции инкрементального чтения Apache Doris также могут сделать этот процесс более эффективным, значительно повышая эффективность обработки и анализа данных.

  • Федеративные запросы к источникам данных : Многие источники корпоративных данных сложны, и данные могут храниться в разных базах данных. Функция мультикаталога Doris поддерживает автоматическое сопоставление и синхронизацию нескольких источников данных, а также поддерживает объединенные запросы между источниками данных. Для предприятий, которым необходимо получать и интегрировать данные из нескольких источников данных для анализа, это значительно сокращает путь потока данных и повышает эффективность работы.

Эта статья познакомит читателей с тем, как быстро создать тестовую и демонстрационную среду для Apache Doris + Apache Hudi в среде Docker, а также продемонстрирует работу каждой функции, чтобы помочь читателям быстро приступить к работе.

руководство пользователя

Все скрипты и код, задействованные в этой статье, можно получить по этому адресу:https://github.com/apache/doris/tree/master/samples/datalake/hudi

01 Подготовка окружающей среды

Пример в этой статье развертывается с помощью Docker Compose. Компоненты и номера версий следующие:

Подготовка среды.png

02 Развертывание среды

  1. Создать сеть Docker
sudo docker network create -d bridge hudi-net
  • 1
  1. Запустите все компоненты
sudo ./start-hudi-compose.sh
  • 1
  1. После запуска вы можете использовать следующий сценарий для входа в командную строку Spark или командную строку Doris:
sudo ./login-spark.sh
sudo ./login-doris.sh
  • 1
  • 2

03 Подготовка данных

Затем сгенерируйте данные Hudi через Spark.Как показано в коде ниже, кластер уже содержит карту с именемcustomer Таблицу Hive, вы можете создать таблицу Hudi с помощью этой таблицы Hive:

-- ./login-spark.sh
spark-sql> use default;

-- create a COW table
spark-sql> CREATE TABLE customer_cow
USING hudi
TBLPROPERTIES (
  type = 'cow',
  primaryKey = 'c_custkey',
  preCombineField = 'c_name'
)
PARTITIONED BY (c_nationkey)
AS SELECT * FROM customer;

-- create a MOR table
spark-sql> CREATE TABLE customer_mor
USING hudi
TBLPROPERTIES (
  type = 'mor',
  primaryKey = 'c_custkey',
  preCombineField = 'c_name'
)
PARTITIONED BY (c_nationkey)
AS SELECT * FROM customer;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

04 Запрос данных

Как показано ниже, файл с именем hudi Каталог (доступен черезHOW CATALOGS Проверять). Ниже приводится заявление о создании каталога:

-- 已经创建,无需再次执行
CREATE CATALOG `hive` PROPERTIES (
    "type"="hms",
    'hive.metastore.uris' = 'thrift://hive-metastore:9083',
    "s3.access_key" = "minio",
    "s3.secret_key" = "minio123",
    "s3.endpoint" = "http://minio:9000",
    "s3.region" = "us-east-1",
    "use_path_style" = "true"
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. Вручную обновите Каталог и синхронизируйте созданную таблицу Hudi:
-- ./login-doris.sh
doris> REFRESH CATALOG hive;
  • 1
  • 2
  1. Использование Spark для работы с данными в Hudi можно увидеть в Doris в режиме реального времени без повторного обновления каталога. Вставляем строку данных в таблицы COW и MOR соответственно через Spark:
spark-sql> insert into customer_cow values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
spark-sql> insert into customer_mor values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
  • 1
  • 2
  1. Последние вставленные данные можно запросить напрямую через Doris:
doris> use hive.default;
doris> select * from customer_cow where c_custkey = 100;
doris> select * from customer_mor where c_custkey = 100;
  • 1
  • 2
  • 3
  1. Потом вставляешь через Spark c_custkey=32 Уже существующие данные, то есть перезапись существующих данных:
spark-sql> insert into customer_cow values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
spark-sql> insert into customer_mor values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
  • 1
  • 2
  1. Обновленные данные можно запросить через Doris:
doris> select * from customer_cow where c_custkey = 32;
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name                    | c_address | c_phone         | c_acctbal | c_mktsegment | c_comment                           | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
|        32 | Customer#000000032_update | jD2xZzi   | 25-430-914-2194 |   3471.59 | BUILDING     | cial ideas. final, furious requests |          15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
doris> select * from customer_mor where c_custkey = 32;
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name                    | c_address | c_phone         | c_acctbal | c_mktsegment | c_comment                           | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
|        32 | Customer#000000032_update | jD2xZzi   | 25-430-914-2194 |   3471.59 | BUILDING     | cial ideas. final, furious requests |          15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

05 Инкрементное чтение

Инкрементное чтение — это одна из функциональных функций, предоставляемых Hudi. С помощью инкрементального чтения пользователи могут получать инкрементные данные в указанном диапазоне времени, тем самым обеспечивая инкрементную обработку данных.Для этого Дорис может вставитьc_custkey=100 Запросите данные последующих изменений.Как показано ниже, мы вставилиc_custkey=32Данные:

doris> select * from customer_cow@incr('beginTime'='20240603015018572');
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name                    | c_address | c_phone         | c_acctbal | c_mktsegment | c_comment                           | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
|        32 | Customer#000000032_update | jD2xZzi   | 25-430-914-2194 |   3471.59 | BUILDING     | cial ideas. final, furious requests |          15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
spark-sql> select * from hudi_table_changes('customer_cow', 'latest_state', '20240603015018572');

doris> select * from customer_mor@incr('beginTime'='20240603015058442');
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name                    | c_address | c_phone         | c_acctbal | c_mktsegment | c_comment                           | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
|        32 | Customer#000000032_update | jD2xZzi   | 25-430-914-2194 |   3471.59 | BUILDING     | cial ideas. final, furious requests |          15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
spark-sql> select * from hudi_table_changes('customer_mor', 'latest_state', '20240603015058442');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

06 Путешествие во времени

Doris поддерживает запрос данных Hudi указанной версии снимка, тем самым реализуя функцию перемещения данных во времени. Во-первых, вы можете запросить историю отправки двух таблиц Hudi через Spark:

spark-sql> call show_commits(table => 'customer_cow', limit => 10);
20240603033556094        20240603033558249        commit        448833        0        1        1        183        0        0
20240603015444737        20240603015446588        commit        450238        0        1        1        202        1        0
20240603015018572        20240603015020503        commit        436692        1        0        1        1        0        0
20240603013858098        20240603013907467        commit        44902033        100        0        25        18751        0        0

spark-sql> call show_commits(table => 'customer_mor', limit => 10);
20240603033745977        20240603033748021        deltacommit        1240        0        1        1        0        0        0
20240603015451860        20240603015453539        deltacommit        1434        0        1        1        1        1        0
20240603015058442        20240603015100120        deltacommit        436691        1        0        1        1        0        0
20240603013918515        20240603013922961        deltacommit        44904040        100        0        25        18751        0        0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Затем его можно выполнить через Дорис. c_custkey=32 , запросите снимок данных перед вставкой данных.Как вы можете видеть нижеc_custkey=32 Данные еще не обновлены:

Примечание. Синтаксис Time Travel в настоящее время не поддерживает новый оптимизатор, и его необходимо выполнить в первую очередь.set enable_nereids_planner=false;Отключите новый оптимизатор, в последующих версиях эта проблема будет исправлена.

doris> select * from customer_cow for time as of '20240603015018572' where c_custkey = 32 or c_custkey = 100;
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
| c_custkey | c_name             | c_address                             | c_phone         | c_acctbal | c_mktsegment | c_comment                                        | c_nationkey |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
|        32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 |   3471.53 | BUILDING     | cial ideas. final, furious requests across the e |          15 |
|       100 | Customer#000000100 | jD2xZzi                               | 25-430-914-2194 |   3471.59 | BUILDING     | cial ideas. final, furious requests              |          25 |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
-- compare with spark-sql
spark-sql> select * from customer_mor timestamp as of '20240603015018572' where c_custkey = 32 or c_custkey = 100;

doris> select * from customer_mor for time as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
| c_custkey | c_name             | c_address                             | c_phone         | c_acctbal | c_mktsegment | c_comment                                        | c_nationkey |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
|       100 | Customer#000000100 | jD2xZzi                               | 25-430-914-2194 |   3471.59 | BUILDING     | cial ideas. final, furious requests              |          25 |
|        32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 |   3471.53 | BUILDING     | cial ideas. final, furious requests across the e |          15 |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
spark-sql> select * from customer_mor timestamp as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Оптимизация запросов

Данные в Apache Hudi можно условно разделить на две категории: базовые данные и дополнительные данные. Базовые данные обычно представляют собой объединенный файл Parquet, а инкрементные данные относятся к приращению данных, созданному с помощью INSERT, UPDATE или DELETE. Базовые данные можно считывать напрямую, а дополнительные данные необходимо считывать посредством слияния при чтении.

Для запросов к таблицам Hudi COW или запросов, оптимизированных для чтения таблиц MOR, данные представляют собой базовые данные, а файлы данных можно считывать непосредственно через встроенное средство чтения паркета Doris, что позволяет получить чрезвычайно быстрые ответы на запросы. Для получения дополнительных данных Дорис необходимо вызвать Java SDK Hudi через JNI для доступа.Чтобы добиться оптимальной производительности запросов, Apache Doris разделит данные запроса на две части: базовые и инкрементальные данные и прочитает их, используя вышеуказанные методы соответственно.

Чтобы проверить эту идею оптимизации, мы передали EXPLAIN оператор, чтобы увидеть, сколько базовых и дополнительных данных содержится в приведенном ниже запросе. Для таблицы COW все 101 сегмент данных являются базовыми данными (hudiNativeReadSplits=101/101 ), поэтому все таблицы COW можно читать непосредственно через Doris Parquet Reader, что позволяет добиться наилучшей производительности запросов. Для таблиц ROW большинство сегментов данных являются базовыми данными (hudiNativeReadSplits=100/101), количество сегментов представляет собой инкрементные данные, и это может в основном обеспечить лучшую производительность запросов.

-- COW table is read natively
doris> explain select * from customer_cow where c_custkey = 32;
|   0:VHUDI_SCAN_NODE(68)                                        |
|      table: customer_cow                                       |
|      predicates: (c_custkey[#5] = 32)                          |
|      inputSplitNum=101, totalFileSize=45338886, scanRanges=101 |
|      partition=26/26                                           |
|      cardinality=1, numNodes=1                                 |
|      pushdown agg=NONE                                         |
|      hudiNativeReadSplits=101/101                              |

-- MOR table: because only the base file contains `c_custkey = 32` that is updated, 100 splits are read natively, while the split with log file is read by JNI.
doris> explain select * from customer_mor where c_custkey = 32;
|   0:VHUDI_SCAN_NODE(68)                                        |
|      table: customer_mor                                       |
|      predicates: (c_custkey[#5] = 32)                          |
|      inputSplitNum=101, totalFileSize=45340731, scanRanges=101 |
|      partition=26/26                                           |
|      cardinality=1, numNodes=1                                 |
|      pushdown agg=NONE                                         |
|      hudiNativeReadSplits=100/101                              |
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Вы можете выполнить некоторые операции удаления через Spark, чтобы дополнительно наблюдать за изменениями в базовых и дополнительных данных Hudi:

-- Use delete statement to see more differences
spark-sql> delete from customer_cow where c_custkey = 64;
doris> explain select * from customer_cow where c_custkey = 64;

spark-sql> delete from customer_mor where c_custkey = 64;
doris> explain select * from customer_mor where c_custkey = 64;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Кроме того, секционирование также может выполняться с помощью условий секционирования, чтобы еще больше уменьшить объем данных и повысить скорость запросов.В следующем примере через условие разделаc_nationkey = 15 Выполните сокращение разделов, чтобы запросы запросов обращались только к одному разделу (partition=1/26) данные.

-- customer_xxx is partitioned by c_nationkey, we can use the partition column to prune data
doris> explain select * from customer_mor where c_custkey = 64 and c_nationkey = 15;
|   0:VHUDI_SCAN_NODE(68)                                        |
|      table: customer_mor                                       |
|      predicates: (c_custkey[#5] = 64), (c_nationkey[#12] = 15) |
|      inputSplitNum=4, totalFileSize=1798186, scanRanges=4      |
|      partition=1/26                                            |
|      cardinality=1, numNodes=1                                 |
|      pushdown agg=NONE                                         |
|      hudiNativeReadSplits=3/4                                  |
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Заключение

Выше приведено подробное руководство по быстрому созданию тестовой/демонстрационной среды на основе Apache Doris и Apache Hudi. В будущем мы также выпустим серию руководств по созданию интегрированной архитектуры озера и хранилища с использованием Apache Doris и различных основных озер данных. форматы и системы хранения, включая Iceberg, Paimon, OSS, Delta Lake и т. д., добро пожаловать на дальнейшее внимание.