|
import tensorflow as tf
|
|
import numpy as np
|
|
import faiss
|
|
|
|
class MultiModalTransformer(tf.keras.Model):
|
|
def __init__(self, hparams, knowledge_base, n_hash=1024, n_quant=256):
|
|
super(MultiModalTransformer, self).__init__()
|
|
self.hparams = hparams
|
|
self.n_hash = n_hash
|
|
self.n_quant = n_quant
|
|
|
|
|
|
self.wte = tf.keras.layers.Embedding(hparams.n_vocab, hparams.n_embd)
|
|
self.wpe = tf.keras.layers.Embedding(hparams.n_ctx, hparams.n_embd)
|
|
self.hash_layer = tf.keras.layers.Dense(n_hash, activation='relu')
|
|
self.quant_layer = tf.keras.layers.Dense(n_quant, activation='relu')
|
|
self.h = [TransformerBlock(hparams.n_embd, hparams.n_head) for _ in range(hparams.n_layer)]
|
|
self.ln_f = tf.keras.layers.LayerNormalization(epsilon=1e-5)
|
|
self.fc = tf.keras.layers.Dense(hparams.n_vocab, use_bias=False)
|
|
|
|
|
|
self.audio_encoder = tf.keras.Sequential([
|
|
tf.keras.layers.Conv1D(256, kernel_size=11, strides=2, padding='same', activation='relu'),
|
|
tf.keras.layers.Conv1D(256, kernel_size=11, strides=2, padding='same', activation='relu'),
|
|
tf.keras.layers.Conv1D(256, kernel_size=11, strides=2, padding='same', activation='relu'),
|
|
tf.keras.layers.GlobalAveragePooling1D(),
|
|
tf.keras.layers.Dense(hparams.n_embd)
|
|
])
|
|
|
|
|
|
self.image_encoder = tf.keras.applications.ResNet50(include_top=False, weights='imagenet')
|
|
self.image_proj = tf.keras.layers.Dense(hparams.n_embd)
|
|
|
|
|
|
self.pitch_embedding = tf.keras.layers.Embedding(128, hparams.n_embd)
|
|
self.duration_embedding = tf.keras.layers.Embedding(32, hparams.n_embd)
|
|
self.velocity_embedding = tf.keras.layers.Embedding(128, hparams.n_embd)
|
|
|
|
|
|
self.anomaly_threshold = tf.Variable(0.5, trainable=False)
|
|
|
|
|
|
self.knowledge_base = knowledge_base
|
|
self.retriever = FAISSRetriever(knowledge_base)
|
|
self.query_encoder = tf.keras.Sequential([
|
|
tf.keras.layers.Dense(hparams.n_embd, activation='relu'),
|
|
tf.keras.layers.Dense(hparams.n_embd)
|
|
])
|
|
|
|
|
|
self.speech_output = tf.keras.layers.Dense(hparams.n_vocab)
|
|
self.caption_output = tf.keras.layers.Dense(hparams.n_vocab)
|
|
self.music_output = tf.keras.layers.Dense(288)
|
|
self.anomaly_output = tf.keras.layers.Dense(1, activation='sigmoid')
|
|
|
|
|
|
self.conversation_history = []
|
|
|
|
|
|
self.personality_traits = {
|
|
'kindness': 0.9,
|
|
'honesty': 0.9,
|
|
'resilience': 0.8,
|
|
'open_mindedness': 0.8,
|
|
'empathy': 0.9,
|
|
'reliability': 0.9,
|
|
'humility': 0.8,
|
|
'positivity': 0.9,
|
|
'courage': 0.8,
|
|
'curiosity': 0.9,
|
|
'humor': 0.8,
|
|
'self_discipline': 0.8,
|
|
'emotional_stability': 0.8,
|
|
'assertiveness': 0.8,
|
|
'creativity': 0.9
|
|
}
|
|
|
|
def call(self, inputs, task):
|
|
if task == 'speech_recognition':
|
|
x = self.audio_encoder(inputs)
|
|
elif task == 'image_captioning':
|
|
image, text = inputs
|
|
image_features = self.image_encoder(image)
|
|
image_features = self.image_proj(tf.keras.layers.GlobalAveragePooling2D()(image_features))
|
|
x = tf.concat([image_features[:, tf.newaxis, :], self.wte(text)], axis=1)
|
|
elif task == 'music_generation':
|
|
pitch, duration, velocity = inputs
|
|
x = self.pitch_embedding(pitch) + self.duration_embedding(duration) + self.velocity_embedding(velocity)
|
|
elif task in ['text_generation', 'anomaly_detection']:
|
|
x = self.wte(inputs)
|
|
else:
|
|
raise ValueError(f"Unknown task: {task}")
|
|
|
|
|
|
if task in ['text_generation', 'image_captioning']:
|
|
query = x[:, 0, :]
|
|
encoded_query = self.query_encoder(query)
|
|
retrieved_docs = self.retriever.retrieve(encoded_query)
|
|
x = tf.concat([x, self.wte(retrieved_docs)], axis=1)
|
|
|
|
|
|
position = tf.range(0, x.shape[1], dtype=tf.int32)[tf.newaxis, :]
|
|
x = x + self.wpe(position)
|
|
|
|
|
|
x = self.hash_layer(x)
|
|
x = self.quant_layer(x)
|
|
for layer in self.h:
|
|
x, _ = layer(x)
|
|
x = self.ln_f(x)
|
|
|
|
|
|
if task == 'speech_recognition':
|
|
return self.speech_output(x)
|
|
elif task == 'image_captioning':
|
|
return self.caption_output(x)
|
|
elif task == 'music_generation':
|
|
return self.music_output(x)
|
|
elif task == 'anomaly_detection':
|
|
reconstruction = self.fc(x)
|
|
reconstruction_loss = tf.reduce_mean(tf.square(inputs - reconstruction), axis=-1)
|
|
anomaly_scores = tf.where(reconstruction_loss > self.anomaly_threshold, 1.0, 0.0)
|
|
return reconstruction, anomaly_scores
|
|
else:
|
|
return self.fc(x)
|
|
|
|
def pipe(self, inputs, task):
|
|
if task == 'speech_recognition':
|
|
return self.call(inputs, task)
|
|
elif task == 'image_captioning':
|
|
return self.call(inputs, task)
|
|
elif task == 'music_generation':
|
|
return self.call(inputs, task)
|
|
elif task == 'text_generation':
|
|
return self.call(inputs, task)
|
|
elif task == 'anomaly_detection':
|
|
return self.call(inputs, task)
|
|
else:
|
|
raise ValueError(f"Unknown task: {task}")
|
|
|
|
def conversation(self, user_input):
|
|
|
|
self.conversation_history.append(user_input)
|
|
|
|
|
|
response = self.generate_response(self.conversation_history)
|
|
|
|
|
|
self.conversation_history.append(response)
|
|
|
|
return response
|
|
|
|
def generate_response(self, conversation_history):
|
|
|
|
conversation_input = tf.concat(conversation_history, axis=0)
|
|
|
|
|
|
response = self.pipe(conversation_input, task='text_generation')
|
|
|
|
|
|
response = self.apply_personality_traits(response)
|
|
|
|
return response
|
|
|
|
def apply_personality_traits(self, response):
|
|
|
|
for trait, value in self.personality_traits.items():
|
|
if trait == 'kindness':
|
|
response = self.add_kindness(response, value)
|
|
elif trait == 'honesty':
|
|
response = self.add_honesty(response, value)
|
|
elif trait == 'resilience':
|
|
response = self.add_resilience(response, value)
|
|
elif trait == 'open_mindedness':
|
|
response = self.add_open_mindedness(response, value)
|
|
elif trait == 'empathy':
|
|
response = self.add_empathy(response, value)
|
|
elif trait == 'reliability':
|
|
response = self.add_reliability(response, value)
|
|
elif trait == 'humility':
|
|
response = self.add_humility(response, value)
|
|
elif trait == 'positivity':
|
|
response = self.add_positivity(response, value)
|
|
elif trait == 'courage':
|
|
response = self.add_courage(response, value)
|
|
elif trait == 'curiosity':
|
|
response = self.add_curiosity(response, value)
|
|
elif trait == 'humor':
|
|
response = self.add_humor(response, value)
|
|
elif trait == 'self_discipline':
|
|
response = self.add_self_discipline(response, value)
|
|
elif trait == 'emotional_stability':
|
|
response = self.add_emotional_stability(response, value)
|
|
elif trait == 'assertiveness':
|
|
response = self.add_assertiveness(response, value)
|
|
elif trait == 'creativity':
|
|
response = self.add_creativity(response, value)
|
|
|
|
return response
|
|
|
|
def add_kindness(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"I understand your concern. {response}"
|
|
return response
|
|
|
|
def add_honesty(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"To be honest, {response}"
|
|
return response
|
|
|
|
def add_resilience(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"Let's keep trying. {response}"
|
|
return response
|
|
|
|
def add_open_mindedness(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"That's an interesting perspective. {response}"
|
|
return response
|
|
|
|
def add_empathy(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"I can see how you feel. {response}"
|
|
return response
|
|
|
|
def add_reliability(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"You can count on me. {response}"
|
|
return response
|
|
|
|
def add_humility(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"I'm still learning. {response}"
|
|
return response
|
|
|
|
def add_positivity(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"Let's stay positive. {response}"
|
|
return response
|
|
|
|
def add_courage(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"Let's face this together. {response}"
|
|
return response
|
|
|
|
def add_curiosity(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"That's fascinating. {response}"
|
|
return response
|
|
|
|
def add_humor(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"On a lighter note, {response}"
|
|
return response
|
|
|
|
def add_self_discipline(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"Let's stay focused. {response}"
|
|
return response
|
|
|
|
def add_emotional_stability(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"Let's stay calm. {response}"
|
|
return response
|
|
|
|
def add_assertiveness(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"I firmly believe that {response}"
|
|
return response
|
|
|
|
def add_creativity(self, response, value):
|
|
|
|
if value > 0.5:
|
|
response = f"Let's think outside the box. {response}"
|
|
return response
|
|
|
|
def fine_tune_personality(self, trait, value):
|
|
|
|
if trait in self.personality_traits:
|
|
self.personality_traits[trait] = value
|
|
else:
|
|
raise ValueError(f"Unknown trait: {trait}")
|
|
|
|
def safe_word_format(self, user_input):
|
|
|
|
if user_input.lower() == "stop":
|
|
self.conversation_history = []
|
|
return "Conversation stopped. You can start a new conversation."
|
|
elif user_input.lower() == "reset":
|
|
self.conversation_history = []
|
|
return "Conversation reset. Let's start fresh."
|
|
else:
|
|
return None
|
|
|
|
class TransformerBlock(tf.keras.layers.Layer):
|
|
def __init__(self, n_embd, n_head):
|
|
super(TransformerBlock, self).__init__()
|
|
self.attn = MultiHeadAttention(n_embd, n_head)
|
|
self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
|
|
self.mlp = tf.keras.Sequential([
|
|
tf.keras.layers.Dense(4 * n_embd, activation=gelu),
|
|
tf.keras.layers.Dense(n_embd)
|
|
])
|
|
self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
|
|
|
|
def call(self, x, past=None):
|
|
a, present = self.attn(self.ln_1(x), past=past)
|
|
x = x + a
|
|
m = self.mlp(self.ln_2(x))
|
|
x = x + m
|
|
return x, present
|
|
|
|
class MultiHeadAttention(tf.keras.layers.Layer):
|
|
def __init__(self, n_embd, n_head):
|
|
super(MultiHeadAttention, self).__init__()
|
|
self.n_embd = n_embd
|
|
self.n_head = n_head
|
|
self.c_attn = tf.keras.layers.Dense(3 * n_embd)
|
|
self.c_proj = tf.keras.layers.Dense(n_embd)
|
|
|
|
def split_heads(self, x):
|
|
return tf.transpose(tf.reshape(x, (*x.shape[:-1], self.n_head, -1)), [0, 2, 1, 3])
|
|
|
|
def merge_heads(self, x):
|
|
return tf.reshape(tf.transpose(x, [0, 2, 1, 3]), (*x.shape[:-3], -1))
|
|
|
|
def call(self, x, past=None):
|
|
c = self.c_attn(x)
|
|
q, k, v = tf.split(c, 3, axis=-1)
|
|
q, k, v = map(self.split_heads, [q, k, v])
|
|
|
|
if past is not None:
|
|
pk, pv = past
|
|
k = tf.concat([pk, k], axis=-2)
|
|
v = tf.concat([pv, v], axis=-2)
|
|
|
|
present = tf.stack([k, v], axis=1)
|
|
a = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(v.shape[-1], tf.float32))
|
|
a = tf.nn.softmax(a)
|
|
a = tf.matmul(a, v)
|
|
a = self.merge_heads(a)
|
|
a = self.c_proj(a)
|
|
return a, present
|
|
|
|
class FAISSRetriever:
|
|
def __init__(self, knowledge_base, dim=768, num_results=5):
|
|
self.index = faiss.IndexFlatL2(dim)
|
|
self.knowledge_base = knowledge_base
|
|
self.num_results = num_results
|
|
|
|
vectors = [doc['vector'] for doc in knowledge_base]
|
|
self.index.add(np.array(vectors))
|
|
|
|
def retrieve(self, query_vector):
|
|
distances, indices = self.index.search(query_vector.numpy(), self.num_results)
|
|
retrieved_docs = [self.knowledge_base[i]['text'] for i in indices[0]]
|
|
return tf.constant(retrieved_docs)
|
|
|
|
def gelu(x):
|
|
return 0.5 * x * (1 + tf.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * tf.pow(x, 3))))
|
|
|
|
|
|
def custom_loss(y_true, y_pred, model, task):
|
|
if task == 'anomaly_detection':
|
|
mse = tf.keras.losses.MeanSquaredError()
|
|
return mse(y_true, y_pred)
|
|
else:
|
|
ce_loss = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)
|
|
reg_loss = tf.reduce_sum([tf.nn.l2_loss(w) for w in model.trainable_weights])
|
|
return ce_loss + 0.01 * reg_loss
|
|
|
|
|
|
@tf.function
|
|
def train_step(model, optimizer, inputs, targets, task):
|
|
with tf.GradientTape() as tape:
|
|
predictions = model(inputs, task)
|
|
loss = custom_loss(targets, predictions, model, task)
|
|
gradients = tape.gradient(loss, model.trainable_variables)
|
|
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
|
|
return loss
|
|
|
|
|
|
class HParams:
|
|
def __init__(self, n_vocab, n_ctx, n_embd, n_head, n_layer):
|
|
self.n_vocab = n_vocab
|
|
self.n_ctx = n_ctx
|
|
self.n_embd = n_embd
|
|
self.n_head = n_head
|
|
self.n_layer = n_layer
|
|
|
|
hparams = HParams(
|
|
n_vocab=50000,
|
|
n_ctx=1024,
|
|
n_embd=768,
|
|
n_head=12,
|
|
n_layer=12
|
|
)
|
|
|
|
|
|
knowledge_base = [
|
|
{'text': 'Example knowledge 1', 'vector': np.random.rand(768)},
|
|
{'text': 'Example knowledge 2', 'vector': np.random.rand(768)},
|
|
|
|
]
|
|
|
|
|
|
model = MultiModalTransformer(hparams, knowledge_base)
|
|
|
|
|
|
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
|
|
|
|
|
|
num_epochs = 10
|
|
for epoch in range(num_epochs):
|
|
for batch in dataset:
|
|
inputs, targets, task = batch
|
|
loss = train_step(model, optimizer, inputs, targets, task)
|
|
print(f"Epoch {epoch + 1}, Loss: {loss.numpy()}")
|
|
|
|
|
|
speech_input = tf.random.normal((1, 16000, 1))
|
|
speech_output = model(speech_input, task='speech_recognition')
|
|
|
|
image_input = tf.random.normal((1, 224, 224, 3))
|
|
text_input = tf.random.uniform((1, 10), maxval=50000, dtype=tf.int32)
|
|
caption_output = model([image_input, text_input], task='image_captioning')
|
|
|
|
music_input = [
|
|
tf.random.uniform((1, 100), maxval=128, dtype=tf.int32),
|
|
tf.random.uniform((1, 100), maxval=32, dtype=tf.int32),
|
|
tf.random.uniform((1, 100), maxval=128, dtype=tf.int32)
|
|
]
|
|
music_output = model(music_input, task='music_generation')
|
|
|
|
text_input = tf.random.uniform((1, 50), maxval=50000, dtype=tf.int32)
|
|
text_output = model(text_input, task='text_generation')
|
|
|
|
anomaly_input = tf.random.normal((1, 100, 768))
|
|
reconstructed, anomalies = model(anomaly_input, task='anomaly_detection')
|
|
|
|
|
|
user_input = "Hello, how are you?"
|
|
response = model.conversation(user_input)
|
|
print(response)
|
|
|
|
|
|
model.fine_tune_personality('kindness', 0.95)
|
|
|
|
|
|
user_input = "stop"
|
|
response = model.safe_word_format(user_input)
|
|
print(response) |