Skip to content

Add a batch scanner that can be used directly for the whole table #2793

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Iceberg & Arrow 's Behavior

Apache Arrow:

import pyarrow as pa

# Iterate over data in batches
for batch in table.to_batches():
    print(batch.to_pandas())

Apache Iceberg:

org.apache.iceberg.Scanner<Record> scanner = icebergTable.newScan().limit(100).build();
scanner.forEach(record -> ...);

Both libraries provide an efficient way to iterate over large datasets using batched access.

❌ Limitation in Fluss

In Fluss, if you want to perform a table-level scan with LIMIT or projection, you have to manually iterate through each bucket and create individual scanners for each one, even if the table is not partitioned.

Example current workaround:

try (Connection connection = ConnectionFactory.createConnection(flussConfig)) {
    Table table = connection.getTable(tablePath);
    Admin flussAdmin = connection.getAdmin();

    // Get table info and generate list of buckets
    TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
    int bucketCount = tableInfo.getNumBuckets();
    List<TableBucket> tableBuckets;
     if (tableInfo.isPartitioned()) {
                List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
                tableBuckets =
                        partitionInfos.stream()
                                .flatMap(
                                        partitionInfo ->
                                                IntStream.range(0, bucketCount)
                                                        .mapToObj(
                                                                bucketId ->
                                                                        new TableBucket(
                                                                                tableInfo
                                                                                        .getTableId(),
                                                                                partitionInfo
                                                                                        .getPartitionId(),
                                                                                bucketId)))
                                .collect(Collectors.toList());
            } else {
                tableBuckets =
                        IntStream.range(0, bucketCount)
                                .mapToObj(
                                        bucketId ->
                                                new TableBucket(tableInfo.getTableId(), bucketId))
                                .collect(Collectors.toList());
            }

    Scan scan = table.newScan().limit(limit).project(projectedFields);
    List<BatchScanner> scanners = 
        tableBuckets.stream()
                   .map(scan::createBatchScanner)
                   .collect(Collectors.toList());

    List<InternalRow> scannedRows = BatchScanUtils.collectLimitedRows(scanners, limit);
}

This approach is not intuitive for users

Proposed Solution

We recommend introducing a createBatchScanner() method at the table level, similar to how Iceberg and Arrow do it.

✅ Expected Usage

Table table = connection.getTable(tablePath);

// Create a batch scanner that can be used directly for the whole table
BatchScanner batchScanner =
    table.newScan()
         .project(projectedFields)
         .limit(limit)
         .createBatchScanner(); // <-- New API

// Iterate over batches
while (batchScanner.hasNext()) {
    InternalRow row = batchScanner.next();
    // process row
}

// or get all the rows at once
List<InternalRow> actualRows = BatchScanUtils.collectRows(batchScanner);

Anything else?

No response

Willingness to contribute

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions