import base64 |
import os |
import ast |
from io import BytesIO |
from typing import List, Union |
from PIL import Image, ImageFile |
import numpy as np |
from scipy.spatial.transform import Rotation |
from lmdeploy import pipeline, TurbomindEngineConfig, ChatTemplateConfig |
def normalize_quaternion(quat): |
return np.array(quat) / np.linalg.norm(quat, axis=-1, keepdims=True) |
def quaternion_to_discrete_euler(quaternion, bins_num=256): |
euler = Rotation.from_quat(quaternion).as_euler('xyz', degrees=True) + 180 |
resolution = 360 / bins_num |
disc = np.around((euler / resolution)).astype(int) |
disc[disc == bins_num] = 0 |
return disc |
def discrete_euler_to_quaternion(discrete_euler, bins_num=256): |
resolution = 360 / bins_num |
euler = (discrete_euler * resolution) - 180 |
return Rotation.from_euler('xyz', euler, degrees=True).as_quat() |
class RotationActionDiscretizer: |
def __init__(self, bins_num=256, min_action=-1, max_action=1): |
""" |
Note: the input action is quaternion |
Args: bins_num: Number of bins to discretize the rotation space into. |
""" |
self.bins_num = bins_num |
def discretize(self, action: Union[np.ndarray, List[float]], degrees=False): |
if len(action) == 4: |
return quaternion_to_discrete_euler(normalize_quaternion(action), bins_num=self.bins_num) |
else: |
return quaternion_to_discrete_euler( |
normalize_quaternion(Rotation.from_euler('xyz', action, degrees=degrees).as_quat()), |
bins_num=self.bins_num |
) |
def undiscretize(self, discrete_action): |
return normalize_quaternion(discrete_euler_to_quaternion(discrete_action, bins_num=self.bins_num)) |
def get_action_space(self): |
return self.bins_num |
def generate_discrete_special_tokens(self)-> List[str]: |
return [f"<rot{i}>" for i in range(self.bins_num)] |
def map_4d_quaternion_to_special_tokens(self, action) -> List[str]: |
discretiezd_action = self.discretize(action) |
return [f"<rot{action}>" for action in discretiezd_action] |
def map_roll_pitch_yaw_to_special_tokens(self, roll_pitch_yaw: Union[np.ndarray, List[float]], degrees=False) -> List[str]: |
discretized_action = self.discretize(roll_pitch_yaw, degrees) |
return [f"<rot{a}>" for a in discretized_action] |
class TranslationActionDiscretizer: |
def __init__(self, bins_num=256, min_action=-1, max_action=1): |
self.bins_num = bins_num |
self.min_action = min_action |
self.max_action = max_action |
self.bins = np.linspace(min_action, max_action, bins_num) |
self.bin_centers = (self.bins[:-1] + self.bins[1:]) / 2.0 |
def discretize(self, action: np.ndarray): |
action = np.clip(action, a_min=float(self.min_action), a_max=float(self.max_action)) |
discretized_action = np.digitize(action, self.bins) |
return discretized_action |
def undiscretize(self, discrete_action): |
""" |
NOTE =>> Because of the way the actions are discretized w.r.t. the bins (and not the bin centers), the |
digitization returns bin indices between [1, # bins], inclusive, when there are actually only |
(# bins - 1) bin intervals. |
Therefore, if the digitization returns the last possible index, we map this to the last bin interval. |
EXAMPLE =>> Let's say self._bins has 256 values. Then self._bin_centers has 255 values. Digitization returns |
indices between [1, 256]. We subtract 1 from all indices so that they are between [0, 255]. There |
is still one index (i==255) that would cause an out-of-bounds error if used to index into |
self._bin_centers. Therefore, if i==255, we subtract 1 from it so that it just becomes the index of |
the last bin center. We implement this simply via clipping between [0, 255 - 1]. |
""" |
discrete_action = np.clip(discrete_action - 1, a_min=0, a_max=self.bin_centers.shape[0] - 1) |
undiscretized_action = self.bin_centers[discrete_action] |
return np.clip(undiscretized_action, self.min_action, self.max_action) |
def get_action_space(self): |
return self.bins_num |
def generate_discrete_special_tokens(self)-> List[str]: |
return [f"<loc{i}>" for i in range(self.bins_num)] |
def map_3d_action_to_special_tokens(self, action) -> List[str]: |
discretiezd_action = self.discretize(action) |
return [f"<loc{action}>" for action in discretiezd_action] |
class OpennessActionDiscretizer: |
def __init__(self, bins_num=256, min_openness=0, max_openness=1): |
""" |
Args: |
bins_num: Number of bins to discretize the openness space into. |
min_openness: Minimum openness of the gripper. |
max_openness: Maximum openness of the gripper. |
""" |
self.bins_num = bins_num |
self.min_openness = min_openness |
self.max_openness = max_openness |
self.bins = np.linspace(min_openness, max_openness, bins_num) |
self.bin_centers = (self.bins[:-1] + self.bins[1:]) / 2.0 |
def discretize(self, openness: float): |
openness = np.clip(openness, a_min=self.min_openness, a_max=self.max_openness) |
discretized_openness = np.digitize(openness, self.bins) |
return discretized_openness |
def undiscretize(self, discrete_openness): |
discrete_openness = np.clip(discrete_openness - 1, a_min=0, a_max=self.bin_centers.shape[0] - 1) |
return self.bin_centers[discrete_openness] |
def get_action_space(self): |
return self.bins_num |
def generate_discrete_special_tokens(self) -> List[str]: |
return [f"<open{i}>" for i in range(self.bins_num)] |
def map_openness_to_special_tokens(self, openness) -> List[str]: |
discretized_openness = self.discretize(openness) |
return [f"<open{discretized_openness}>"] |
def load_image_from_base64(image: Union[bytes, str]) -> Image.Image: |
"""load image from base64 format.""" |
return Image.open(BytesIO(base64.b64decode(image))) |
def load_image(image_url: Union[str, Image.Image]) -> Image.Image: |
"""load image from url, local path or openai GPT4V.""" |
FETCH_TIMEOUT = int(os.environ.get('LMDEPLOY_FETCH_TIMEOUT', 10)) |
headers = { |
'User-Agent': |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' |
'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' |
} |
try: |
if isinstance(image_url, Image.Image): |
img = image_url |
else: |
img = Image.open(image_url) |
img = img.convert('RGB') |
except Exception as error: |
if isinstance(image_url, str) and len(image_url) > 100: |
image_url = image_url[:100] + ' ...' |
print(f'{error}, image_url={image_url}') |
img = Image.new('RGB', (32, 32)) |
return img |
model = '/mnt/petrelfs/huangsiyuan/VLA/InternVL/internvl_chat/output/internvla_8b_1node_with_visual_traces_wo_sp_token_w_cam/VLA8B_V1' |
pipe = pipeline(model, backend_config=TurbomindEngineConfig(session_len=2048), chat_template_config=ChatTemplateConfig(model_name='internvl2-internlm2')) |
TRANS_MAX = 0.275 |
TRANS_MIN = -0.275 |
ROT_MIN = -0.350 |
ROT_MAX = 0.395 |
OPEN_MIN = -0.388 |
OPEN_MAX = 0.300 |
translation_bins_num = 256 |
rotation_bins_num = 256 |
openness_bins_num = 256 |
translation_action_discretizer = TranslationActionDiscretizer(bins_num=translation_bins_num, max_action=TRANS_MAX, min_action=TRANS_MIN) |
rotation_action_discretizer = RotationActionDiscretizer(bins_num=rotation_bins_num, min_action=ROT_MIN, max_action=ROT_MAX) |
openness_action_discretizer = OpennessActionDiscretizer(bins_num=openness_bins_num, min_openness=OPEN_MIN, max_openness=OPEN_MAX) |
VQA_FORMAT = f"{IMAGE_TOKEN}\n {IMAGE_TOKEN}\n Given the observation images from the wrist camera mounted at CAM_PARAM and the overhead camera mounted at CAM_PARAM, please provide the action that the robot should take to finish the task: TASK" |
wrist_cam_pose = [0.3618544138321802, -0.08323374464523976, 0.41759402329169787, 2.6584232953914344, 0.035482430406705845, 1.2906347836099603] |
overhead_cam_pose = [-0.09877916942983442, -0.3919519409041736, 0.4780865865815033, -1.8237694898473762, -0.012183613523460979, -0.746683044221379] |
cam_pose_list = [wrist_cam_pose, overhead_cam_pose] |
for cam_pose in cam_pose_list: |
cam_xyz_token = translation_action_discretizer.discretize(np.array(cam_pose[:3])) |
cam_rpy_token = rotation_action_discretizer.discretize(np.array(cam_pose[3:6])) |
cam_action_tokens = [cam_xyz_token[0], cam_xyz_token[1], cam_xyz_token[2], cam_rpy_token[0], cam_rpy_token[1], cam_rpy_token[2]] |
cam_action_tokens_str = "<cam>[" + ",".join(map(str, cam_action_tokens)) + "]</cam>" |
VQA_FORMAT = VQA_FORMAT.replace("CAM_PARAM", cam_action_tokens_str, 1) |
task = "Pick up the green object from the table and put it in the bowl" |
VQA_FORMAT = VQA_FORMAT.replace("TASK", task) |
img1 = "/mnt/petrelfs/huangsiyuan/VLA/droid_action_tasks_internvl/sample_images/2_0.png" |
img2 = "/mnt/petrelfs/huangsiyuan/VLA/droid_action_tasks_internvl/sample_images/2_1.png" |
images = [load_image(img1), load_image(img2)] |
response = pipe((VQA_FORMAT, images)) |
print(response.text) |
print("gt: [124,137,104,126,130,129,233]") |
action_list = np.array(ast.literal_eval(response.text)) |
xyz = translation_action_discretizer.undiscretize(action_list[:3]) |
rpy = rotation_action_discretizer.undiscretize(action_list[3:6]) |
openness = openness_action_discretizer.undiscretize(action_list[6]) |
print(f"xyz: {xyz}, rpy: {rpy}, openness: {openness}") |