|
import io |
|
import os |
|
import shutil |
|
import base64 |
|
import gradio as gr |
|
from PIL import Image, ImageDraw |
|
|
|
from MobileAgent.text_localization import ocr |
|
from MobileAgent.icon_localization import det |
|
from MobileAgent.local_server import mobile_agent_infer |
|
|
|
from modelscope import snapshot_download |
|
from modelscope.pipelines import pipeline |
|
from modelscope.utils.constant import Tasks |
|
|
|
|
|
chatbot_css = """ |
|
<style> |
|
.chat-container { |
|
display: flex; |
|
flex-direction: column; |
|
overflow-y: auto; |
|
max-height: 630px; |
|
margin: 10px; |
|
} |
|
.user-message, .bot-message { |
|
margin: 5px; |
|
padding: 10px; |
|
border-radius: 10px; |
|
} |
|
.user-message { |
|
text-align: right; |
|
background-color: #7B68EE; |
|
color: white; |
|
align-self: flex-end; |
|
} |
|
.bot-message { |
|
text-align: left; |
|
background-color: #ADD8E6; |
|
color: black; |
|
align-self: flex-start; |
|
} |
|
.user-image { |
|
text-align: right; |
|
align-self: flex-end; |
|
max-width: 150px; |
|
max-height: 300px; |
|
} |
|
.bot-image { |
|
text-align: left; |
|
align-self: flex-start; |
|
max-width: 200px; |
|
max-height: 400px; |
|
} |
|
</style> |
|
""" |
|
|
|
|
|
temp_file = "temp" |
|
screenshot = "screenshot" |
|
cache = "cache" |
|
if not os.path.exists(temp_file): |
|
os.mkdir(temp_file) |
|
if not os.path.exists(screenshot): |
|
os.mkdir(screenshot) |
|
if not os.path.exists(cache): |
|
os.mkdir(cache) |
|
|
|
|
|
groundingdino_dir = snapshot_download('AI-ModelScope/GroundingDINO', revision='v1.0.0') |
|
groundingdino_model = pipeline('grounding-dino-task', model=groundingdino_dir) |
|
ocr_detection = pipeline(Tasks.ocr_detection, model='damo/cv_resnet18_ocr-detection-line-level_damo') |
|
ocr_recognition = pipeline(Tasks.ocr_recognition, model='damo/cv_convnextTiny_ocr-recognition-document_damo') |
|
|
|
|
|
def encode_image(image_path): |
|
with open(image_path, "rb") as image_file: |
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
|
def get_all_files_in_folder(folder_path): |
|
file_list = [] |
|
for file_name in os.listdir(folder_path): |
|
file_list.append(file_name) |
|
return file_list |
|
|
|
|
|
def crop(image, box, i): |
|
image = Image.open(image) |
|
x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3]) |
|
if x1 >= x2-10 or y1 >= y2-10: |
|
return |
|
cropped_image = image.crop((x1, y1, x2, y2)) |
|
cropped_image.save(f"./temp/{i}.png", format="PNG") |
|
|
|
|
|
def merge_text_blocks(text_list, coordinates_list): |
|
merged_text_blocks = [] |
|
merged_coordinates = [] |
|
|
|
sorted_indices = sorted(range(len(coordinates_list)), key=lambda k: (coordinates_list[k][1], coordinates_list[k][0])) |
|
sorted_text_list = [text_list[i] for i in sorted_indices] |
|
sorted_coordinates_list = [coordinates_list[i] for i in sorted_indices] |
|
|
|
num_blocks = len(sorted_text_list) |
|
merge = [False] * num_blocks |
|
|
|
for i in range(num_blocks): |
|
if merge[i]: |
|
continue |
|
|
|
anchor = i |
|
|
|
group_text = [sorted_text_list[anchor]] |
|
group_coordinates = [sorted_coordinates_list[anchor]] |
|
|
|
for j in range(i+1, num_blocks): |
|
if merge[j]: |
|
continue |
|
|
|
if abs(sorted_coordinates_list[anchor][0] - sorted_coordinates_list[j][0]) < 10 and \ |
|
sorted_coordinates_list[j][1] - sorted_coordinates_list[anchor][3] >= -10 and sorted_coordinates_list[j][1] - sorted_coordinates_list[anchor][3] < 30 and \ |
|
abs(sorted_coordinates_list[anchor][3] - sorted_coordinates_list[anchor][1] - (sorted_coordinates_list[j][3] - sorted_coordinates_list[j][1])) < 10: |
|
group_text.append(sorted_text_list[j]) |
|
group_coordinates.append(sorted_coordinates_list[j]) |
|
merge[anchor] = True |
|
anchor = j |
|
merge[anchor] = True |
|
|
|
merged_text = "\n".join(group_text) |
|
min_x1 = min(group_coordinates, key=lambda x: x[0])[0] |
|
min_y1 = min(group_coordinates, key=lambda x: x[1])[1] |
|
max_x2 = max(group_coordinates, key=lambda x: x[2])[2] |
|
max_y2 = max(group_coordinates, key=lambda x: x[3])[3] |
|
|
|
merged_text_blocks.append(merged_text) |
|
merged_coordinates.append([min_x1, min_y1, max_x2, max_y2]) |
|
|
|
return merged_text_blocks, merged_coordinates |
|
|
|
|
|
def get_perception_infos(screenshot_file): |
|
width, height = Image.open(screenshot_file).size |
|
|
|
text, coordinates = ocr(screenshot_file, ocr_detection, ocr_recognition) |
|
text, coordinates = merge_text_blocks(text, coordinates) |
|
|
|
perception_infos = [] |
|
for i in range(len(coordinates)): |
|
perception_info = {"text": "text: " + text[i], "coordinates": coordinates[i]} |
|
perception_infos.append(perception_info) |
|
|
|
coordinates = det(screenshot_file, "icon", groundingdino_model) |
|
|
|
for i in range(len(coordinates)): |
|
perception_info = {"text": "icon", "coordinates": coordinates[i]} |
|
perception_infos.append(perception_info) |
|
|
|
image_box = [] |
|
image_id = [] |
|
for i in range(len(perception_infos)): |
|
if perception_infos[i]['text'] == 'icon': |
|
image_box.append(perception_infos[i]['coordinates']) |
|
image_id.append(i) |
|
|
|
for i in range(len(image_box)): |
|
crop(screenshot_file, image_box[i], image_id[i]) |
|
|
|
images = get_all_files_in_folder(temp_file) |
|
if len(images) > 0: |
|
images = sorted(images, key=lambda x: int(x.split('/')[-1].split('.')[0])) |
|
image_id = [int(image.split('/')[-1].split('.')[0]) for image in images] |
|
icon_map = {} |
|
prompt = 'This image is an icon from a phone screen. Please briefly describe the shape and color of this icon in one sentence.' |
|
|
|
string_image = [] |
|
for i in range(len(images)): |
|
image_path = os.path.join(temp_file, images[i]) |
|
string_image.append({"image_name": images[i], "image_file": encode_image(image_path)}) |
|
query_data = {"task": "caption", "images": string_image, "query": prompt} |
|
response_query = mobile_agent_infer(query_data) |
|
icon_map = response_query["icon_map"] |
|
|
|
for i, j in zip(image_id, range(1, len(image_id)+1)): |
|
if icon_map.get(str(j)): |
|
perception_infos[i]['text'] = "icon: " + icon_map[str(j)] |
|
|
|
for i in range(len(perception_infos)): |
|
perception_infos[i]['coordinates'] = [int((perception_infos[i]['coordinates'][0]+perception_infos[i]['coordinates'][2])/2), int((perception_infos[i]['coordinates'][1]+perception_infos[i]['coordinates'][3])/2)] |
|
|
|
return perception_infos, width, height |
|
|
|
|
|
def image_to_base64(image): |
|
buffered = io.BytesIO() |
|
image.save(buffered, format="PNG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") |
|
img_html = f'<img src="data:image/png;base64,{img_str}" />' |
|
return img_html |
|
|
|
|
|
def chatbot(image, instruction, add_info, history, chat_log): |
|
if history == {}: |
|
thought_history = [] |
|
summary_history = [] |
|
action_history = [] |
|
summary = "" |
|
action = "" |
|
completed_requirements = "" |
|
memory = "" |
|
insight = "" |
|
error_flag = False |
|
user_msg = "<div class='user-message'>{}</div>".format(instruction) |
|
else: |
|
thought_history = history["thought_history"] |
|
summary_history = history["summary_history"] |
|
action_history = history["action_history"] |
|
summary = history["summary"] |
|
action = history["action"] |
|
completed_requirements = history["completed_requirements"] |
|
memory = history["memory"][0] |
|
insight = history["insight"] |
|
error_flag = history["error_flag"] |
|
user_msg = "<div class='user-message'>{}</div>".format("I have uploaded the screenshot. Please continue operating.") |
|
|
|
images = get_all_files_in_folder(cache) |
|
if len(images) > 0 and len(images) <= 100: |
|
images = sorted(images, key=lambda x: int(x.split('/')[-1].split('.')[0])) |
|
image_id = [int(image.split('/')[-1].split('.')[0]) for image in images] |
|
cur_image_id = image_id[-1] + 1 |
|
elif len(images) > 100: |
|
images = sorted(images, key=lambda x: int(x.split('/')[-1].split('.')[0])) |
|
image_id = [int(image.split('/')[-1].split('.')[0]) for image in images] |
|
cur_image_id = image_id[-1] + 1 |
|
os.remove(os.path.join(cache, str(image_id[0])+".png")) |
|
else: |
|
cur_image_id = 1 |
|
|
|
image.save(os.path.join(cache, str(cur_image_id) + ".png"), format="PNG") |
|
screenshot_file = os.path.join(cache, str(cur_image_id) + ".png") |
|
perception_infos, width, height = get_perception_infos(screenshot_file) |
|
shutil.rmtree(temp_file) |
|
os.mkdir(temp_file) |
|
|
|
local_screenshot_file = encode_image(screenshot_file) |
|
query_data = { |
|
"task": "decision", |
|
"screenshot_file": local_screenshot_file, |
|
"instruction": instruction, |
|
"perception_infos": perception_infos, |
|
"width": width, |
|
"height": height, |
|
"summary_history": summary_history, |
|
"action_history": action_history, |
|
"summary": summary, |
|
"action": action, |
|
"add_info": add_info, |
|
"error_flag": error_flag, |
|
"completed_requirements": completed_requirements, |
|
"memory": memory, |
|
"memory_switch": True, |
|
"insight": insight |
|
} |
|
|
|
response_query = mobile_agent_infer(query_data) |
|
output_action = response_query["decision"] |
|
output_memory = response_query["memory"] |
|
if output_action == "No token": |
|
bot_response = ["<div class='bot-message'>{}</div>".format("Sorry, the resources can be exhausted today.")] |
|
chat_html = "<div class='chat-container'>{}</div>".format("".join(bot_response)) |
|
return chatbot_css + chat_html, history, chat_log |
|
|
|
thought = output_action.split("### Thought ###")[-1].split("### Action ###")[0].replace("\n", " ").replace(":", "").replace(" ", " ").strip() |
|
summary = output_action.split("### Operation ###")[-1].replace("\n", " ").replace(" ", " ").strip() |
|
action = output_action.split("### Action ###")[-1].split("### Operation ###")[0].replace("\n", " ").replace(" ", " ").strip() |
|
|
|
output_memory = output_memory.split("### Important content ###")[-1].split("\n\n")[0].strip() + "\n" |
|
if "None" not in output_memory and output_memory not in memory: |
|
memory += output_memory |
|
|
|
if "Open app" in action: |
|
bot_response = "Please click the red circle and upload the current screenshot again." |
|
app_name = action.split("(")[-1].split(")")[0] |
|
text, coordinate = ocr(screenshot_file, ocr_detection, ocr_recognition) |
|
for ti in range(len(text)): |
|
if app_name == text[ti]: |
|
name_coordinate = [int((coordinate[ti][0] + coordinate[ti][2])/2), int((coordinate[ti][1] + coordinate[ti][3])/2)] |
|
x, y = name_coordinate[0], name_coordinate[1] |
|
radius = 75 |
|
draw = ImageDraw.Draw(image) |
|
draw.ellipse([x - radius, y - radius, x + radius, y + radius], outline='red', width=10) |
|
break |
|
|
|
elif "Tap" in action: |
|
bot_response = "Please click the red circle and upload the current screenshot again." |
|
coordinate = action.split("(")[-1].split(")")[0].split(", ") |
|
x, y = int(coordinate[0]), int(coordinate[1]) |
|
radius = 75 |
|
draw = ImageDraw.Draw(image) |
|
draw.ellipse([x - radius, y - radius, x + radius, y + radius], outline='red', width=10) |
|
|
|
elif "Swipe" in action: |
|
bot_response = "Please slide from red circle to blue circle and upload the current screenshot again." |
|
coordinate1 = action.split("Swipe (")[-1].split("), (")[0].split(", ") |
|
coordinate2 = action.split("), (")[-1].split(")")[0].split(", ") |
|
x1, y1 = int(coordinate1[0]), int(coordinate1[1]) |
|
x2, y2 = int(coordinate2[0]), int(coordinate2[1]) |
|
radius = 75 |
|
draw = ImageDraw.Draw(image) |
|
draw.ellipse([x1 - radius, y1 - radius, x1 + radius, y1 + radius], outline='red', width=10) |
|
draw.ellipse([x2 - radius, y2 - radius, x2 + radius, y2 + radius], outline='blue', width=10) |
|
|
|
elif "Type" in action: |
|
if "(text)" not in action: |
|
text = action.split("(")[-1].split(")")[0] |
|
else: |
|
text = action.split(" \"")[-1].split("\"")[0] |
|
bot_response = f"Please type the \"{text}\" and upload the current screenshot again." |
|
|
|
elif "Back" in action: |
|
bot_response = f"Please back to previous page and upload the current screenshot again." |
|
|
|
elif "Home" in action: |
|
bot_response = f"Please back to home page and upload the current screenshot again." |
|
|
|
elif "Stop" in action: |
|
bot_response = f"Task completed." |
|
|
|
bot_text1 = "<div class='bot-message'>{}</div>".format("### Decision ###") |
|
bot_thought = "<div class='bot-message'>{}</div>".format("Thought: " + thought) |
|
bot_action = "<div class='bot-message'>{}</div>".format("Action: " + action) |
|
bot_operation = "<div class='bot-message'>{}</div>".format("Operation: " + summary) |
|
bot_text2 = "<div class='bot-message'>{}</div>".format("### Memory ###") |
|
bot_memory = "<div class='bot-message'>{}</div>".format(output_memory) |
|
bot_response = "<div class='bot-message'>{}</div>".format(bot_response) |
|
if image is not None: |
|
bot_img_html = image_to_base64(image) |
|
bot_response = "<div class='bot-image'>{}</div>".format(bot_img_html) + bot_response |
|
|
|
chat_log.append(user_msg) |
|
|
|
thought_history.append(thought) |
|
summary_history.append(summary) |
|
action_history.append(action) |
|
|
|
history["thought_history"] = thought_history |
|
history["summary_history"] = summary_history |
|
history["action_history"] = action_history |
|
history["summary"] = summary |
|
history["action"] = action |
|
history["memory"] = memory, |
|
history["memory_switch"] = True, |
|
history["insight"] = insight |
|
history["error_flag"] = error_flag |
|
|
|
query_data = { |
|
"task": "planning", |
|
"instruction": instruction, |
|
"thought_history": thought_history, |
|
"summary_history": summary_history, |
|
"action_history": action_history, |
|
"completed_requirements": "", |
|
"add_info": add_info |
|
} |
|
|
|
response_query = mobile_agent_infer(query_data) |
|
output_planning = response_query["planning"] |
|
if output_planning == "No token": |
|
bot_response = ["<div class='bot-message'>{}</div>".format("Sorry, the resources can be exhausted today.")] |
|
chat_html = "<div class='chat-container'>{}</div>".format("".join(bot_response)) |
|
return chatbot_css + chat_html, history, chat_log |
|
|
|
output_planning = output_planning.split("### Completed contents ###")[-1].replace("\n", " ").strip() |
|
history["completed_requirements"] = output_planning |
|
|
|
bot_text3 = "<div class='bot-message'>{}</div>".format("### Planning ###") |
|
output_planning = "<div class='bot-message'>{}</div>".format(output_planning) |
|
|
|
chat_log.append(bot_text3) |
|
chat_log.append(output_planning) |
|
chat_log.append(bot_text1) |
|
chat_log.append(bot_thought) |
|
chat_log.append(bot_action) |
|
chat_log.append(bot_operation) |
|
chat_log.append(bot_text2) |
|
chat_log.append(bot_memory) |
|
chat_log.append(bot_response) |
|
|
|
chat_html = "<div class='chat-container'>{}</div>".format("".join(chat_log)) |
|
|
|
return chatbot_css + chat_html, history, chat_log |
|
|
|
|
|
def lock_input(instruction): |
|
return gr.update(value=instruction, interactive=False), gr.update(value=None) |
|
|
|
|
|
def reset_demo(): |
|
return gr.update(value="", interactive=True), gr.update(value="If you want to tap an icon of an app, use the action \"Open app\"", interactive=True), "<div class='chat-container'></div>", {}, [] |
|
|
|
|
|
tos_markdown = ("""<div style="display:flex; gap: 0.25rem;" align="center"> |
|
<a href='https://github.com/X-PLUG/MobileAgent'><img src='https://img.shields.io/badge/Github-Code-blue'></a> |
|
<a href="https://arxiv.org/abs/2406.01014"><img src="https://img.shields.io/badge/Arxiv-2406.01014-red"></a> |
|
<a href='https://github.com/X-PLUG/MobileAgent/stargazers'><img src='https://img.shields.io/github/stars/X-PLUG/MobileAgent.svg?style=social'></a> |
|
</div> |
|
If you like our project, please give us a star ✨ on Github for latest update. |
|
|
|
**Terms of use** |
|
1. Input your instruction in \"Instruction\", for example \"Turn on the dark mode\". |
|
2. You can input helpful operation knowledge in \"Knowledge\". |
|
3. Click \"Submit\" to get the operation. You need to operate your mobile device according to the operation and then upload the screenshot after your operation. |
|
4. The 5 cases in \"Examples\" are a complete flow. Click and submit from top to bottom to experience. |
|
5. Due to limited resources, each operation may take a long time, please be patient and wait. |
|
|
|
**使用说明** |
|
1. 在“Instruction”中输入你的指令,例如“打开深色模式”。 |
|
2. 你可以在“Knowledge”中输入帮助性的操作知识。 |
|
3. 点击“Submit”来获得操作。你需要根据输出来操作手机,并且上传操作后的截图。 |
|
4. “Example”中的5个例子是一个任务。从上到下点击它们并且点击“Submit”来体验。 |
|
5. 由于资源有限,每次操作的时间会比较长,请耐心等待。""") |
|
|
|
title_markdowm = ("""# Mobile-Agent-v2: Mobile Device Operation Assistant with Effective Navigation via Multi-Agent Collaboration""") |
|
|
|
instruction_input = gr.Textbox(label="Instruction", placeholder="Input your instruction") |
|
knowledge_input = gr.Textbox(label="Knowledge", placeholder="Input your knowledge", value="If you want to tap an icon of an app, use the action \"Open app\"") |
|
with gr.Blocks() as demo: |
|
history_state = gr.State(value={}) |
|
history_output = gr.State(value=[]) |
|
with gr.Row(): |
|
gr.Markdown(title_markdowm) |
|
with gr.Row(): |
|
with gr.Column(scale=5): |
|
gr.Markdown(tos_markdown) |
|
with gr.Row(): |
|
image_input = gr.Image(label="Screenshot", type="pil", height=550, width=230) |
|
gr.Examples(examples=[ |
|
["./example/example_1.jpg", "Turn on the dark mode"], |
|
["./example/example_2.jpg", "Turn on the dark mode"], |
|
["./example/example_3.jpg", "Turn on the dark mode"], |
|
["./example/example_4.jpg", "Turn on the dark mode"], |
|
["./example/example_5.jpg", "Turn on the dark mode"], |
|
], inputs=[image_input, instruction_input, knowledge_input]) |
|
|
|
with gr.Column(scale=6): |
|
instruction_input.render() |
|
knowledge_input.render() |
|
with gr.Row(): |
|
start_button = gr.Button("Submit") |
|
clear_button = gr.Button("Clear") |
|
output_component = gr.HTML(label="Chat history", value="<div class='chat-container'></div>") |
|
|
|
start_button.click( |
|
fn=lambda image, instruction, add_info, history, output: chatbot(image, instruction, add_info, history, output), |
|
inputs=[image_input, instruction_input, knowledge_input, history_state, history_output], |
|
outputs=[output_component, history_state, history_output] |
|
) |
|
|
|
clear_button.click( |
|
fn=reset_demo, |
|
inputs=[], |
|
outputs=[instruction_input, knowledge_input, output_component, history_state, history_output] |
|
) |
|
|
|
demo.queue().launch(share=True) |