Skip to content

[fix,feat] Support MooncakeStore easy init#45

Merged
0oshowero0 merged 14 commits into
Ascend:mainfrom
0oshowero0:mooncake
Mar 16, 2026
Merged

[fix,feat] Support MooncakeStore easy init#45
0oshowero0 merged 14 commits into
Ascend:mainfrom
0oshowero0:mooncake

Conversation

@0oshowero0

@0oshowero0 0oshowero0 commented Mar 12, 2026

Copy link
Copy Markdown
Collaborator

New Features

  • Added MooncakeStore Configurations: Introduced related configuration options for MooncakeStore in config.py.
  • Easy Initialization: Implemented support for tq.init() when using the MooncakeStore backend.
  • E2E CI Coverage: Added end-to-end continuous integration tests specifically for the MooncakeStore backend.

Bug Fixes

  • KVStorageManager Check: Removed an outdated validation check in KVStorageManager that previously caused issues during put operations.
  • Metadata Update Tracking: Fixed a metadata update issue in TransferQueueController. Now, when a field transforms between a normal tensor and a nested tensor, the system correctly recomputes and updates the per_sample_shape, is_nested, and shape information.
  • ZMQ Related: Set recv_multipart(copy=False) by default.

Known Issues

  • Graceful Shutdown Limitations: We cannot gracefully shut down mooncake_master because the distributed TransferQueueClient holding MooncakeDistributedStore() will raise heartbeat error. As a workaround, we currently launch mooncake_master when setting auto_init=true but bypass shutting it down. To minimize possible influence, we call remove_all() to delete all the keys in mooncake_master.
  • Uneven BatchMeta Fields: TransferQueueController currently cannot handle non-uniform BatchMeta instances where samples do not have equal fields. This prevents key-value-based backends from accurately clearing all keys. In MooncakeStore, we are temporarily using remove_by_regex to mitigate this issue.
  • 1D Tensor Handling: When a user inputs a 1D tensor, previous refactoring populated an empty torch.Size([]) which could mislead key-value-based backends during zero-copy operations. Since these backends must perform fine-grained splits on the input TensorDict, distinguishing between 1D and 2D input tensors is difficult. We have now added a warning for this type of input and manually populate the shape with torch.Size([1]).
        # in AsyncTransferQueueClient.async_put()
        for field_name, field_data in data.items():
            if isinstance(field_data, torch.Tensor) and field_data.ndim == 1:
                logger.warning(
                    f"[{self.client_id}]: Data field '{field_name}' is a tensor with only one dimension. "
                    f"You may receive 2D tensors in key-value based backend."
                )

Configuration Reference

The config structure for MooncakeStore looks like this:

backend:
  # Pluggable storage/transport backend of TransferQueue. Choose from:
  # SimpleStorage, Yuanrong, MooncakeStore, ...
  storage_backend: MooncakeStore

  # For MooncakeStore:
  MooncakeStore:
    # Auto init metadata_server
    auto_init: true
    # Address of the HTTP metadata server
    metadata_server: localhost:50050
    # Address of master server
    master_server_address: localhost:50051
    # Address of local host
    local_hostname: localhost
    # Protocol for transmission. Choose from: tcp, rdma. (default: tcp)
    protocol: tcp
    # Memory segment size in bytes for mounting (default: 4GB)
    global_segment_size: 4294967296
    # Local buffer size in bytes (default: 1GB)
    local_buffer_size: 1073741824
    # Network device name. Set to "" to let Mooncake to auto-picks devices
    device_name: ""

CC:@zhaohaidao @dpj135 @mpb159753

Copilot AI review requested due to automatic review settings March 12, 2026 06:30
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Mooncake storage integration by renaming the Mooncake client identifier/class, expanding MooncakeStore configuration defaults, and adding logic to start/stop a mooncake_master process when the MooncakeStore backend is selected.

Changes:

  • Rename Mooncake client registration/class/export from MooncakeStorageClient to MooncakeStoreClient.
  • Add MooncakeStore defaults to config.yaml and adjust Mooncake client config handling (buffer sizes, metadata URL formatting).
  • Update transfer_queue.interface to track SimpleStorage handles separately and to start/terminate a mooncake_master subprocess for MooncakeStore.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
transfer_queue/storage/managers/mooncake_manager.py Updates expected/default Mooncake client name in manager config validation.
transfer_queue/storage/clients/mooncake_client.py Renames client class/registration and adjusts config parsing (segment sizes, metadata server formatting).
transfer_queue/storage/clients/init.py Updates public export to the new Mooncake client class name.
transfer_queue/interface.py Adds MooncakeStore subprocess startup and revises storage handle tracking/cleanup.
transfer_queue/config.yaml Introduces a MooncakeStore config section with defaults and comments.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread transfer_queue/storage/clients/mooncake_client.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/config.yaml Outdated
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

2 similar comments
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 requested a review from Copilot March 12, 2026 10:52
@0oshowero0 0oshowero0 changed the title [feat] Support MooncakeStore one-click init [feat] Support MooncakeStore easy init Mar 12, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (1)

transfer_queue/storage/clients/mooncake_client.py:75

  • After validating metadata_server / master_server_address are non-None and of type str, the subsequent if self.metadata_server is None / if self.master_server_address is None checks are unreachable. Removing the dead checks will reduce confusion about which invariants the client actually enforces.
        if self.metadata_server is None or not isinstance(self.metadata_server, str):
            raise ValueError("Missing or invalid 'metadata_server' in config")
        if self.master_server_address is None or not isinstance(self.master_server_address, str):
            raise ValueError("Missing or invalid 'master_server_address' in config")

        if not self.metadata_server.startswith("http://") and not self.metadata_server.startswith("etcd://"):
            self.metadata_server = f"http://{self.metadata_server}/metadata"

        if self.metadata_server is None:
            raise ValueError("Missing 'metadata_server' in config")
        if self.master_server_address is None:
            raise ValueError("Missing 'master_server_address' in config")


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py
Comment thread transfer_queue/storage/clients/mooncake_client.py
Comment thread tests/e2e/test_kv_interface_e2e.py Outdated
Comment thread tests/e2e/test_e2e_lifecycle_consistency.py Outdated
Comment thread transfer_queue/interface.py
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix pre commit

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

1 similar comment
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 changed the title [feat] Support MooncakeStore easy init [fix,feat] Support MooncakeStore easy init Mar 13, 2026
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

2 similar comments
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

try: unify extract_field_schema

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

partially fix bugs

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix remove_samples

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix all ut

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +162 to +166
# Determine is_non_tensor: when first_item is None (empty field), cannot determine type
if first_item is None:
is_non_tensor = None
else:
is_non_tensor = not is_tensor
Comment thread transfer_queue/metadata.py Outdated
Comment on lines +173 to +178
assert value.shape[0] == batch_size
if len(value.shape) > 1:
# Multi-dim tensor: shape = value.shape[1:]
sample_shape = value.shape[1:]
else:
sample_shape = torch.Size([1])
Comment thread transfer_queue/controller.py
Comment thread transfer_queue/storage/clients/mooncake_client.py Outdated
Comment on lines +319 to +326
elif key == "MooncakeStore":
check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True)
if check.returncode == 0:
pids = check.stdout.strip().replace("\n", ", ")
logger.warning(
f"mooncake_master process still exists with PID: {pids}. "
f"Consider manually killing mooncake_master."
)
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if first_item is None:
is_non_tensor = None
else:
is_non_tensor = not is_tensor
Comment thread transfer_queue/metadata.py Outdated
Comment on lines +607 to +609
raise RuntimeError(
"Fail to clear_data for key-value based backends due to lack of `field_names` in BatchMeta"
)
Comment thread transfer_queue/storage/clients/mooncake_client.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 merged commit cde575c into Ascend:main Mar 16, 2026
4 of 5 checks passed
0oshowero0 added a commit that referenced this pull request Mar 19, 2026
…ary copy and use `recv_multipart(copy=False)` by default (#46)

1. Use `recv_multipart(copy=False)` by default, which returns a writable
memory object in `zmq.Frame`
2. Remove redundant memory copy
3. Fix bugs introduced in #45 when dynamically update the `FieldMeta`
status.
---
Scripts to validate:

```python3
import zmq
import torch
import numpy as np
import multiprocessing
import time

# Import your serialization and deserialization interfaces
# Assuming the code you just wrote is saved in serial_utils.py in the same directory
from transfer_queue.utils.serial_utils import encode, decode

def sender():
    """Sender process"""
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://127.0.0.1:5557")
    
    # 1. Create data to send (containing a numpy array and a torch tensor)
    np_arr = np.ones((5, 5), dtype=np.float32)
    pt_tensor = torch.ones((5, 5), dtype=torch.float32)
    
    print(f"[Sender] Created NumPy Array, shape: {np_arr.shape}")
    print(f"[Sender] Created PyTorch Tensor, shape: {pt_tensor.shape}")
    
    # 2. Assemble into a complex nested structure for testing
    payload = {
        "metadata": {"version": "1.0", "description": "test zero-copy"},
        "data_np": np_arr,
        "data_pt": pt_tensor
    }
    
    # 3. Call your encode interface
    # encode returns a list[bytes], the first frame is msgpack, and subsequent frames are underlying memory views
    frames = encode(payload)
    print(f"[Sender] Serialization complete, generated {len(frames)} frames.")
    
    # 4. Send using multipart + zero-copy
    socket.send_multipart(frames, copy=False)
    print("[Sender] Data sending complete.")
    
    time.sleep(1) # Wait for receiver to process
    socket.close()
    context.term()

def receiver():
    """Receiver process"""
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:5557")
    
    # 1. Receive multiple frames with zero-copy
    # copy=False will make the returned result a list[zmq.Frame]
    frames = socket.recv_multipart(copy=False)
    print(f"\n[Receiver] Received {len(frames)} frames.")
    print(f"[Receiver] Frame types: {[type(f) for f in frames]}")
    
    # 2. Call your decode interface to deserialize
    # At this point, the passed frames are a list of zmq.Frame
    payload = decode(frames)
    
    recv_np = payload["data_np"]
    recv_pt = payload["data_pt"]
    
    print("\n--- Verify NumPy ---")
    print(f"[Receiver] NumPy object type: {type(recv_np)}, dtype: {recv_np.dtype}")
    print(f"[Receiver] Is NumPy memory writeable: {recv_np.flags.writeable}")
    try:
        recv_np[0, 0] = 99.0
        print(f"[Receiver] ✅ NumPy write successful! recv_np[0, 0] = {recv_np[0, 0]}")
    except Exception as e:
        print(f"[Receiver] ❌ NumPy write failed: {e}")
        
    print("\n--- Verify PyTorch Tensor ---")
    print(f"[Receiver] Tensor object type: {type(recv_pt)}, dtype: {recv_pt.dtype}")
    try:
        recv_pt[0, 0] = 88.0
        print(f"[Receiver] ✅ Tensor write successful! recv_pt[0, 0] = {recv_pt[0, 0].item()}")
    except Exception as e:
        print(f"[Receiver] ❌ Tensor write failed: {e}")

    socket.close()
    context.term()

if __name__ == '__main__':
    p_recv = multiprocessing.Process(target=receiver)
    p_send = multiprocessing.Process(target=sender)
    
    p_recv.start()
    time.sleep(0.5) # Ensure the receiver binds first
    p_send.start()
    
    p_send.join()
    p_recv.join()
```

---------

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 mentioned this pull request Mar 19, 2026
23 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants