Change Data Capture for Apache Phoenix Stream

[ad_1]

Apache Phoenix is an open-source, SQL skin over Apache HBase that enables lightning-fast OLTP (Online Transactional Processing) operations on petabytes of data using standard SQL queries. Phoenix helps combine the scalability of NoSQL with the familiarity and power of SQL – the best of both worlds.

Apache Phoenix provides Change Data Capture (CDC) with PHOENIX-7001. The CDC design in Phoenix leverages the write-optimized Uncovered Index as well as Max Lookback features. The changes are captured in the time-ordered event of row level modifications.

Table of Contents

Problem Statement

Since the CDC uses an uncovered index on PHOENIX_ROW_TIMESTAMP(), the CDC consumer needs to provide time duration for which the records are expected to be read from the CDC Index. In this approach, any event of HBase table region split does not have any impact on the consumer queries as the RegionServer holding the index table performs the scans on multiple data table regions depending on how many regions have been involved with the table data modifications in the given time range. Therefore, the consumer does not have the option to consume only the given table region (partition) specific change events. It is specifically important for the cloud native applications that consume Change Stream records to be able to identify how much compute units (memory, CPU, IO etc) needs to be allocated according to the number of data table regions involved for the given time range. As the region size grows beyond a certain limit, HBase considers splitting the given region based on the split policy. The default split policy is based on region size growth. For instance, regions are not allowed to grow beyond 10 GB by default.

Solution

The solution required a new framework introducing the Streaming concepts for Phoenix CDC. The solution provides one active stream for the given table on which the CDC is enabled by the consumer.

Change Stream: Phoenix Stream captures a time-ordered sequence of row-level modifications in any table and stores this information in a log for up to TTL window (24 hour by default). Client applications can access this log and view the changes with an optional support of how the data appeared before and after the row is modified, in near-real time.

Stream Partitions: Stream records are organized into groups, or partitions. Each partition acts as a container for multiple stream records and contains information required for accessing and iterating through these records. The stream records within a partition are removed automatically after the TTL window.

Create CDC on Phoenix Table:

Here, Phoenix CDC Stream creates an uncovered index on the data table with index on (PARTITION_ID(), PHOENIX_ROW_TIMESTAMP()).

  • PARTITION_ID(): Server side function to retrieve the encoded region name to which the data modification is being done in HBase. This is considered as partition id for the data table region. It will be 32 byte string.
  • PHOENIX_ROW_TIMESTAMP(): Function to retrieve the row update timestamp from the empty column cell.

A partition of an index is a logical partition defined by PARTITION_ID(). An index region may have rows from one or more index partitions. Since PARTITION_ID() will be the index row key prefix, the rows of a given partition will be laid out on the index table consecutively. When a data table region splits into two daughter regions, the parent region gets archived and no longer receives any mutations. The new daughter regions start receiving new mutations and hence the mutations are recorded in the order of their arrival for the new partition that aligns with new daughter regions.

Each partition of the index table refers to one partition for the given stream. Partitions can be categorized into two categories:

  1. Open partitions: Any partition with corresponding data table region that is currently active is considered as open partition. The data table region can continue to serve read/write requests until it is split into two daughter regions or multiple parent regions are merged into one region.
  2. Closed partitions: Any partition with corresponding data table region that is no longer alive and ready to be archived or already archived after getting split or merged into new region(s), is considered as closed partition. The data table region is no longer live and therefore can no longer serve any more read/write requests.

Both open and closed partitions can contain records if the corresponding live and archived regions received data modifications. Both open and closed partitions can be consumed such that the data are read from the uncovered index with optional pre-image and post-image modifications. The records will no longer be available after the TTL expiry defined on the CDC Stream index table.

Partition records should be identified by a numerical timestamp value represented by PHOENIX_ROW_TIMESTAMP() that will increase with new mutations. The timestamp number is used to retrieve the records from the given partition from a specific position. This helps resume the scan operation by the consumer on the open partition when all records of the given partition were consumed previously.

It is also important to store parent to child relationships among the partitions as the regions get split and/or merged.

Partition Records

CDC Stream Metadata

The metadata related to all open and closed partitions for the given stream are persisted in new system tables: SYSTEM.CDC_STREAM and SYSTEM.CDC_STREAM_STATUS.

SYSTEM.CDC_STREAM_STATUS table tracks the status of CDC streams for tables that have CDC enabled. It maintains the lifecycle state of each CDC stream.

Columns:

  • TABLE_NAME (VARCHAR) – Name of the table that has CDC enabled
  • STREAM_NAME (VARCHAR) – Unique identifier for the CDC stream
  • STREAM_STATUS (VARCHAR) – Current status of the stream
  • STREAM_TYPE (VARCHAR) – Type of the stream
Primary Key: (TABLE_NAME, STREAM_NAME)

SYSTEM.CDC_STREAM table stores partition metadata for CDC streams. It tracks information about table regions/partitions that are part of the CDC stream, including partition boundaries and lifecycle information.

Columns:

  • TABLE_NAME (VARCHAR) – Name of the table that has CDC enabled
  • STREAM_NAME (VARCHAR) – Unique identifier for the CDC stream
  • PARTITION_ID (VARCHAR) – Unique identifier for the partition (HBase region encoded name)
  • PARENT_PARTITION_ID (VARCHAR) – ID of the parent partition (used for region splits/merges)
  • PARTITION_START_TIME (BIGINT) – Timestamp when this partition became active
  • PARTITION_END_TIME (BIGINT) – Timestamp when this partition was split/merged (NULL if active)
  • PARTITION_START_KEY (VARBINARY_ENCODED) – Start row key of the partition
  • PARTITION_END_KEY.  (VARBINARY_ENCODED) – End row key of the partition
  • PARENT_PARTITION_START_TIME (BIGINT) – Start time of the parent partition
Primary Key: (TABLE_NAME, STREAM_NAME, PARTITION_ID, PARENT_PARTITION_ID)

Change Stream Records Consumption

18491222 blank diagram

For a consumer to start consuming the stream records, a series of steps are followed:

Step 1: Get the change stream for the given table

SELECT STREAM_NAME
FROM SYSTEM.CDC_STREAM_STATUS
WHERE TABLE_NAME = ? AND STREAM_STATUS = 'ACTIVE'

Step 2: Get the partitions for the given table

SELECT PARTITION_ID, PARENT_PARTITION_ID, PARTITION_START_TIME, PARTITION_END_TIME 
FROM SYSTEM.CDC_STREAM 
WHERE TABLE_NAME = ? AND STREAM_NAME = ?

Step 3: Consume the stream records from the given partition

SELECT /*+ CDC_INCLUDE(PRE, POST) */ * 
FROM  
WHERE PARTITION_ID() = ?
AND PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(? AS BIGINT) AS TIMESTAMP)
LIMIT ?
OFFSET ?

PARTITION_ID() value is provided from the result of step 2. The step 2 is repeated when a given HBase region is split into daughter regions. When a region splits, the corresponding partition closes and therefore new child partition metadata is queried on step 2.

PHOENIX_ROW_TIMESTAMP() value is provided as epoch timestamp. LIMIT and OFFSET are used optionally for the consumers to paginate through the change stream records from the given partition.

[ad_2]

Share this content:

I am a passionate blogger with extensive experience in web design. As a seasoned YouTube SEO expert, I have helped numerous creators optimize their content for maximum visibility.

Leave a Comment