noding / parser.py
broadfield-dev's picture
Update parser.py
5bed3d1 verified
import ast
import hashlib
def get_category_id(category):
"""Maps categorical roles to integers for vector embedding."""
mapping = {
'unknown': 0, 'import': 1, 'function': 2, 'class': 3,
'if': 4, 'while': 5, 'for': 6, 'try': 7, 'expression': 8,
'spacer': 9, 'elif': 10, 'else': 11, 'except': 12,
'return': 13, 'assigned_variable': 14, 'variable_def': 15
}
return mapping.get(category, 0)
def create_vector(category, level, location, total_lines, parent_path):
"""
Creates a 6D normalized vector with rounded values to reduce JSON size.
"""
cat_id = get_category_id(category)
start, end = location
total_lines = max(1, total_lines)
# Calculate metrics
span = (end - start + 1) / total_lines
center = ((start + end) / 2) / total_lines
parent_depth = len(parent_path)
# Ancestry weight
path_str = "".join(parent_path)
parent_weight = (int(hashlib.md5(path_str.encode()).hexdigest(), 16) % 100) / 100.0
# OPTIMIZATION: Round floats to 4 decimals
return [
cat_id,
level,
round(center, 4),
round(span, 4),
parent_depth,
round(parent_weight, 4)
]
def parse_source_to_graph(code):
try:
tree = ast.parse(code)
except SyntaxError as e:
return {"error": f"Syntax Error on line {e.lineno}: {e.msg}"}
lines = code.splitlines(keepends=True)
total_lines = len(lines)
nodes = []
def traverse(node, parent_path=[], level=0, parent_id=None):
category = 'other'
name = getattr(node, 'name', None)
node_id = f"{type(node).__name__}_{getattr(node, 'lineno', 0)}_{getattr(node, 'col_offset', 0)}"
# Categorization logic
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): category = 'function'
elif isinstance(node, ast.ClassDef): category = 'class'
elif isinstance(node, ast.If): category = 'if'; name = "if"
elif isinstance(node, (ast.For, ast.AsyncFor)): category = 'for'; name = "for"
elif isinstance(node, ast.While): category = 'while'; name = "while"
elif isinstance(node, ast.Return): category = 'return'; name = "return"
elif isinstance(node, (ast.Assign, ast.AnnAssign)): category = 'assigned_variable'; name = "assignment"
elif isinstance(node, ast.Expr): category = 'expression'; name = "expr"
elif isinstance(node, ast.Try): category = 'try'; name = "try"
elif isinstance(node, (ast.Import, ast.ImportFrom)): category = 'import'; name = "import"
lineno = getattr(node, 'lineno', 0)
end_lineno = getattr(node, 'end_lineno', lineno)
if lineno == 0: return
label = name if name else category
if category == 'assigned_variable':
targets = getattr(node, 'targets', []) or [getattr(node, 'target', None)]
if targets and isinstance(targets[0], ast.Name):
label = f"{targets[0].id} ="
vector = create_vector(category, level, (lineno, end_lineno), total_lines, parent_path)
# OPTIMIZATION: Send 'loc' (location) instead of 'source' string.
# Shorten keys to reduce payload size.
node_data = {
"id": node_id,
"lbl": label, # label -> lbl
"type": category,
"loc": [lineno, end_lineno], # Start/End lines only
"vec": vector, # vector -> vec
"lvl": level, # level -> lvl
"pid": parent_id # parent_id -> pid
}
if category != 'other':
nodes.append(node_data)
current_path = parent_path + [node_id]
current_parent = node_id
next_level = level + 1
else:
current_path = parent_path
current_parent = parent_id
next_level = level
for child in ast.iter_child_nodes(node):
traverse(child, current_path, next_level, current_parent)
for node in tree.body:
traverse(node)
nodes.sort(key=lambda x: x['loc'][0])
# Update connections to use shorter keys
connections = []
node_ids = {n['id'] for n in nodes}
for node in nodes:
if node['pid'] and node['pid'] in node_ids:
connections.append({"f": node['pid'], "t": node['id']}) # from/to -> f/t
return {"nodes": nodes, "connections": connections}
def generate_connections(nodes):
connections = []
node_map = {n['id']: n for n in nodes}
for node in nodes:
# 1. Structural Hierarchy (Tree)
if node['parent_id'] and node['parent_id'] in node_map:
connections.append({
"from": node['parent_id'],
"to": node['id'],
"type": "hierarchy"
})
return connections