File size: 11,822 Bytes
9ffc0a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import torch
import transformers
import gradio as gr

class MRGenerator:
    def __init__(self):
        model_id = "arcee-ai/Llama-3.1-SuperNova-Lite"
        self.pipeline = transformers.pipeline(
            "text-generation",
            model=model_id,
            model_kwargs={"torch_dtype": torch.bfloat16},
            device_map="auto",)
        self.system_prompt = {"role": "system",
                              "content": '''# CONTEXT #You are an expert in traffic rules and scene analysis.
                            #Key Concepts# 1. traffic rule: Define how the ego-vehicle should maneuver in the specific driving scenario. The ontology elements in driving scenario are classified into road_network and object_environment.
                            2. maneuver: A specific action or movement that an ego-vehicle performs.
                            3. road_network: Road elements are specified in the traffic rule, such as lanes, lines and crosswalks.
                            4. object_environment: One object or environment is specified in the traffic rule.'''}

        self.terminators = [
            self.pipeline.tokenizer.eos_token_id,
            self.pipeline.tokenizer.convert_tokens_to_ids("<|eot_id|>")
        ]
        self.max_new_tokens = 1024
        self.do_sample = True
        self.temperature = 0.1
        self.top_p = 0.9
        
    def find_maneuver(self, prompt):
        user_message = {
            "role": "user",
            "content": f"""
            # OBJECTIVE #  Your task is to find the ego-vehicle's most dangerous maneuver in the traffic rules.
            # STYLE # The supported maneuvers are limited to the following types:
            slow down, stop, turn, turn left, turn right, keep the same.
            # NOTICE # Some maneuvers could lead to the listed types, e.g., 'yield' can be interpreted as 'slow down'.
            Generate answer for maneuver only. Do not generate other output.
            # EXAMPLE # 
            Example Text: If you are driving on an unpaved road that intersects with a paved road, you must yield the right-of-way to vehicles traveling on the paved road.
            Example Answer: slow down
            Example Text: Steady Red Light (Stop) Stop before entering the crosswalk or intersection. You may turn right unless prohibited by law. You may also turn left if both streets are one way, unless prohibited by law. You must yield to all pedestrians and other traffic lawfully using the intersection.
            Example Answer: stop
            ===== END OF EXAMPLE ======
            Text: {prompt}
            Answer: """
        }
        answer = self.LLM([self.system_prompt, user_message])
        return answer

    def find_road_network(self, prompt):
        user_message = {
            "role": "user",
            "content": f"""
            # OBJECTIVE #  Your task is to find the most dangerous road_network where ego-vehicle will violate the traffic rule.
            # NOTICE # Generate answer for road_network only. Do not generate other output. Before answering, verify the road_network describes road, not traffic participants.
            road_network only refers to road types, such as: intersection, one-way street, two-way street, roundabout, highway, freeway, residential street, rural road, urban street, bridge, tunnel, parking lot, alley, T-junction, divided highway, bike lane, etc. 
            The answer must be ONE or TWO words only.
            road_network should not conflict with object_environments. For instance, avoid conflicts between a red arrow light and the lane where a red cross-shaped light or arrow light is active. In case of any conflicts, set road_network to "any road". The answer should not exceed three words.
            # EXAMPLE # 
            Example Text: If you are driving on an unpaved road that intersects with a paved road, you must yield the right-of-way to vehicles traveling on the paved road.
            Example Answer: unpaved road 
            Example Text: Steady Red Light (Stop) Stop before entering the crosswalk or intersection. You may turn right unless prohibited by law. You may also turn left if both streets are one way, unless prohibited by law. You must yield to all pedestrians and other traffic lawfully using the intersection.
            Example Answer: crosswalk
            ===== END OF EXAMPLE ======
            Text: {prompt}
            Answer: """
        }
        answer = self.LLM([self.system_prompt, user_message])
        return answer

    def find_object_environment(self, prompt):
        user_message = {
            "role": "user",
            "content": f"""
                    # OBJECTIVE #  Your task is to find all suitable object_environment, which will lead ego-vehicle to take the most dangerous maneuver.
                    # NOTICE # Generate answer for object_environment only. Do not generate other output.
                    Each item in the answer must be a single object_environment. Sentences containing "or" should be split into separate content.
                    Before answering, verify the logic: "In the road_network, object_environment cause the ego-vehicle to maneuver". If this logic doesn't hold true, please don't output this object_environment.
                    Every object_environment must be ONE to three words only.
                    # EXAMPLE # 
                    Example Text: If you are driving on an unpaved road that intersects with a paved road, you must yield the right-of-way to vehicles traveling on the paved road.
                    Example Answer: "vehicle"
                    Example Text: Steady Red Light (Stop) Stop before entering the crosswalk or intersection. You may turn right unless prohibited by law. You may also turn left if both streets are one way, unless prohibited by law. You must yield to all pedestrians and other traffic lawfully using the intersection.
                    Example Answer:  "red light"
                    Example Text: Lane signal lights indicate: (1) When the green arrow light is on, allow vehicles in the lane to pass in the direction indicated; (2) When the red cross-shaped light or arrow light is on, vehicles in the lane are prohibited from passing.
                    Example Answer: "red light"
                    Example Text: Flashing red light: Vehicles and streetcars/trams must stop at the stopping point before proceeding.
                    Example Answer: "flashing red light"
                    Example Text: Slow down on wet road. Do not suddenly turn, speed up, or stop.
                    Example Answer: "Wet"
                    ===== END OF EXAMPLE ======
                    Text: {prompt}
                    Answer: """
        }
        answer = self.LLM([self.system_prompt, user_message])
        answer = answer.strip('"')
        answer = [item.strip().strip('"').strip("'") for item in answer.split(',')]
        return answer[0]  # Just return the first object_environment

    def combine_to_MR(self, maneuver, road_network, object_environment):
        user_message = {
            "role": "user",
            "content": f"""
            # OBJECTIVE
            Your task is to combine vehicle maneuver, road_network, object_environments into MR.
            # EXAMPLE
            Example Text:  maneuver: "slow down", road_network: "unpaved road", object_environment: "vehicle"
            Example Answer:Given the unpaved road
            When ITMI add vehicle
            Then ego-vehicle should slow down
            Example Text:  maneuver: "stop", road_network: "crosswalk", object_environment: "red light"
            Example Answer:Given the crosswalk
            When ITMI add red light
            Then ego-vehicle should stop
            User: maneuver: "{maneuver}", road_network: "{road_network}", object_environment: "{object_environment}"
            """}
        answer = self.LLM([self.system_prompt, user_message])
        return answer

    def find_prompt(self, road_network, object_environment):
        user_message = {
            "role": "user", 
            "content": f'''road_network:{road_network},objects_environment:{object_environment}'''
        }
        answer = self.LLM([
            {"role": "system", "content": '''Generate a sample diffusion inpainting prompt based on the given traffic rule and scenario. 
            Provide ONLY the prompt, with no additional explanation or content.
            This prompt should describe the scene from the camera's perspective, focusing on the traffic rule.
            Ensure the prompt is faithful to the original text and captures the key visual elements.
            Describe the scene from the camera's perspective mounted on the ego vehicle that must change its state (e.g., yield, stop). Focus on the visual elements of the road network and environment without explicitly mentioning the ego vehicle as a subject.
            IMPORTANT: Limit the prompt to a maximum of 50 words.
            Dot not show any independent vehicle contorl wards like slow down, yield,  prepare to stop in this prompt!.'''},
            {"role": "user", "content": """road_network: intersection, objects_environment: a bicycle rider"""},
            {"role": "assistant", "content": """You are driving approach to intersection, a bicycle rider on the road"""},
            {"role": "user", "content": """road_network: intersection, objects_environment: turn left sign"""},
            {"role": "assistant", "content": """You are driving approach to intersection, a turn left sign on the road"""},
            {"role": "user", "content": """road_network: construction zone, objects_environment: traffic control devices"""},
            {"role": "assistant", "content": """You are driving approach to construction zone, a traffic control devices on the road, """},
            user_message
        ])
        return answer

    def LLM(self, messages):
        outputs = self.pipeline(
            messages,
            max_new_tokens=self.max_new_tokens,
            eos_token_id=self.terminators,
            do_sample=self.do_sample,
            temperature=self.temperature,
            top_p=self.top_p,
            pad_token_id=self.pipeline.tokenizer.eos_token_id
        )
        answer = outputs[0]["generated_text"][-1]['content']
        return answer

    def __call__(self, traffic_rule):
        # Find components
        maneuver = self.find_maneuver(traffic_rule)
        road_network = self.find_road_network(traffic_rule)
        object_environment = self.find_object_environment(traffic_rule)
        
        # Generate MR and prompt
        mr = self.combine_to_MR(maneuver, road_network, object_environment)
        diffusion_prompt = self.find_prompt(road_network, object_environment)
        
        result = {
            "MR": mr,
            "maneuver": maneuver,
            "road_network": road_network,
            "object_environment": object_environment,
            "diffusion_prompt": diffusion_prompt
        }
        return result

def generate_mr(traffic_rule):
    generator = MRGenerator()
    results = generator(traffic_rule)
    # Format the output as a string
    output = f"""MR:
{results['MR']}

Components:
Maneuver: {results['maneuver']}
Road Network: {results['road_network']}
Object/Environment: {results['object_environment']}

Diffusion Prompt:
{results['diffusion_prompt']}"""
    return output

# Create Gradio interface
demo = gr.Interface(
    fn=generate_mr,
    inputs=gr.Textbox(label="Enter Traffic Rule", lines=3),
    outputs=gr.Textbox(label="Generated Output", lines=10),
    title="Traffic Rule MR Generator",
    description="Enter a traffic rule to generate its corresponding MR and components."
)

if __name__ == "__main__":
    demo.launch()