[client] Move limit scan code to client's table api from flink.#2794
Conversation
48f4ca9 to
5847b71
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds a table-level batch scanning API to the Fluss client (to avoid Flink-side bucket iteration) and updates Flink limit pushdown to use the new client capability.
Changes:
- Introduce
Scan#createBatchScanner()and implement it inTableScanby building bucket scanners and combining them viaCompositeBatchScanner. - Update Flink
PushdownUtils.limitScanto use the new table-level batch scanner API. - Add unit/integration tests for
CompositeBatchScannerand table-level limit scans.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java |
Switch Flink limit scan implementation to use client table-level createBatchScanner() API. |
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java |
Add new table-level createBatchScanner() API to the public scan interface. |
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java |
Implement table-level batch scanner creation by enumerating buckets/partitions and composing scanners. |
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java |
New composite batch scanner to unify multiple per-bucket scanners behind one BatchScanner. |
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java |
New unit tests for composite scanner behavior (no-limit, limit, close). |
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java |
Rename IT case and add integration test for table-level scan with limit. |
Comments suppressed due to low confidence (1)
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java:343
- This test claims to verify “respects limit” but asserts actual.size() >= limit. From an API perspective, Scan.limit(N) should return at most N rows; allowing more will surprise callers (especially since BatchScanUtils.collectRows returns all rows from the scanner). Consider updating the implementation to cap results at limit and tighten this assertion to <= limit (and keep the existing <= 9 bound).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5847b71 to
ad01dd3
Compare
wuchong
left a comment
There was a problem hiding this comment.
Thanks @loserwang1024 , I left some minor comments.
| // Ensure all scanners are closed on failure to avoid resource leaks | ||
| IOUtils.closeQuietly(scanner); | ||
| scannerQueue.forEach(IOUtils::closeQuietly); | ||
| scannerQueue.clear(); |
There was a problem hiding this comment.
Should this logic be moved to the close() method? If a fatal exception occurs, the scanner owner is responsible for manually invoking close(). Placing it here might be problematic if the exception is transient and eligible for retry.
| assertThat(actual.size()).isGreaterThanOrEqualTo(limit); | ||
| assertThat(actual.size()).isLessThanOrEqualTo(9); |
There was a problem hiding this comment.
Just assert the size should be equal to limit? The current assersion looks like the returned result is not determinist.
| while (batch.hasNext()) { | ||
| values.add(batch.next().getInt(0)); | ||
| } | ||
| assertThat(values.size()).isGreaterThanOrEqualTo(3); |
Purpose
Linked issue: close #2793
Brief change log
Tests
API and Format
Documentation