gpicciuca commited on
Commit
1e9eb0b
·
1 Parent(s): 058f1d9

Update missing documentation

Browse files
app/tasks/inference.py CHANGED
@@ -8,15 +8,29 @@ import time
8
  from scipy.special import softmax
9
 
10
  # HuggingFace Model to be used for inferencing
11
- MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
 
12
 
13
  class InferenceTask:
14
 
 
 
 
 
 
 
15
  def __init__(self):
16
  self.clear()
17
  self.load_model()
18
 
19
  def load_model(self):
 
 
 
 
 
 
 
20
  try:
21
  self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
22
  self.__config = AutoConfig.from_pretrained(MODEL)
@@ -30,15 +44,37 @@ class InferenceTask:
30
  return True
31
 
32
  def clear(self):
 
 
 
33
  self.__is_loaded = False
34
  self.__tokenizer = None
35
  self.__config = None
36
  self.__model = None
37
 
38
  def is_loaded(self):
 
 
 
 
 
 
 
39
  return self.__is_loaded
40
 
41
  def predict(self, messages: list[str]):
 
 
 
 
 
 
 
 
 
 
 
 
42
  if len(messages) == 0:
43
  return None
44
 
@@ -69,6 +105,15 @@ class InferenceTask:
69
  return labelized_scores
70
 
71
  def __calculate_mean_sentiment(self, labelized_scores: list):
 
 
 
 
 
 
 
 
 
72
  total_samples = float(len(labelized_scores))
73
 
74
  mean_sentiment = {
@@ -88,8 +133,17 @@ class InferenceTask:
88
 
89
  return mean_sentiment
90
 
91
- # Preprocess text (username and link placeholders)
92
  def __preprocess(self, messages: list[str]):
 
 
 
 
 
 
 
 
 
 
93
  msg_list = []
94
  for message in messages:
95
  new_message = []
@@ -101,6 +155,17 @@ class InferenceTask:
101
  return msg_list
102
 
103
  def __labelize(self, scores):
 
 
 
 
 
 
 
 
 
 
 
104
  output = {}
105
  ranking = np.argsort(scores)
106
  ranking = ranking[::-1]
 
8
  from scipy.special import softmax
9
 
10
  # HuggingFace Model to be used for inferencing
11
+ MODEL = "gpicciuca/twitter-roberta-base-sentiment-latest"
12
+ # MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
13
 
14
  class InferenceTask:
15
 
16
+ """
17
+ This class encapsulates the entire inferencing logic by using HuggingFace's Transformers library.
18
+ It offers a convenient "predict()" method that returns a list of dictionaries, where each
19
+ dictionary contains the sentiment analysis for each message that has been evaluated.
20
+ """
21
+
22
  def __init__(self):
23
  self.clear()
24
  self.load_model()
25
 
26
  def load_model(self):
27
+ """
28
+ Loads the classification model, its configuration and the tokenizer required for pre-processing
29
+ any text that needs to be inferenced later on.
30
+
31
+ Returns:
32
+ bool: True if loading succeeded, false otherwise
33
+ """
34
  try:
35
  self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
36
  self.__config = AutoConfig.from_pretrained(MODEL)
 
44
  return True
45
 
46
  def clear(self):
47
+ """
48
+ Resets the state of this instance
49
+ """
50
  self.__is_loaded = False
51
  self.__tokenizer = None
52
  self.__config = None
53
  self.__model = None
54
 
55
  def is_loaded(self):
56
+ """
57
+ Checks if the class is ready and can be used, depending on whether
58
+ a model has been loaded.
59
+
60
+ Returns:
61
+ bool: True if model was loaded, false otherwise
62
+ """
63
  return self.__is_loaded
64
 
65
  def predict(self, messages: list[str]):
66
+ """
67
+ Method taking a list of messages to perform the sentiment classification on.
68
+ Each inference run is logged in MLFlow under the experiment 'Sentiment Analysis'.
69
+ For efficiency, only the average of the whole bulk request is logged.
70
+
71
+ Args:
72
+ messages (list[str]): List of messages to classify
73
+
74
+ Returns:
75
+ list[dict]: A list of dictionaries where each element contains the probabilities
76
+ for 'positive', 'neutral' and 'negative' sentiment.
77
+ """
78
  if len(messages) == 0:
79
  return None
80
 
 
105
  return labelized_scores
106
 
107
  def __calculate_mean_sentiment(self, labelized_scores: list):
108
+ """
109
+ Calculates the average sentiment over a list of classified messages.
110
+
111
+ Args:
112
+ labelized_scores (list): List of labelled scores resulting from the prediction step.
113
+
114
+ Returns:
115
+ dict: Dictionary with average values for for 'positive', 'neutral' and 'negative'.
116
+ """
117
  total_samples = float(len(labelized_scores))
118
 
119
  mean_sentiment = {
 
133
 
134
  return mean_sentiment
135
 
 
136
  def __preprocess(self, messages: list[str]):
137
+ """
138
+ Preprocesses the messages to remove certain patterns that are not
139
+ required for inferencing. User tags and http links are stripped out.
140
+
141
+ Args:
142
+ messages (list[str]): List of messages to preprocess
143
+
144
+ Returns:
145
+ list[str]: List of processed messages without user tags and links
146
+ """
147
  msg_list = []
148
  for message in messages:
149
  new_message = []
 
155
  return msg_list
156
 
157
  def __labelize(self, scores):
158
+ """
159
+ Helper method to transform numpy labels, coming as a result of the classification,
160
+ back into their equivalent textual version so that they are human-readable by using
161
+ the model's configuration.
162
+
163
+ Args:
164
+ scores: Result from prediction for each individual message
165
+
166
+ Returns:
167
+ dict: Dictionary containing the sentiment prediction with human-readable labels
168
+ """
169
  output = {}
170
  ranking = np.argsort(scores)
171
  ranking = ranking[::-1]
app/tasks/training.py CHANGED
@@ -10,19 +10,17 @@ from transformers import (
10
  pipeline,
11
  )
12
  from huggingface_hub import login, logout
 
13
 
14
  import os
15
  import mlflow
16
  from tasks.inference import infer_task
17
  from config import is_test_mode
 
18
 
19
- """
20
- Documentation:
21
- - https://huggingface.co/docs/transformers/en//training
22
- - https://mlflow.org/docs/latest/llms/transformers/tutorials/fine-tuning/transformers-fine-tuning
23
- """
24
 
25
- MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
 
26
  DATASET = "zeroshot/twitter-financial-news-sentiment"
27
  HF_DEST_REPO = "financial-twitter-roberta-sentiment"
28
 
@@ -30,6 +28,17 @@ RNG_SEED = 22
30
 
31
  class TrainingTask:
32
 
 
 
 
 
 
 
 
 
 
 
 
33
  TRAINING_TASK_INST_SINGLETON = None
34
 
35
  def __init__(self):
@@ -45,12 +54,28 @@ class TrainingTask:
45
  self.__trainer = None
46
  self.__run_id = None
47
 
 
 
48
  @staticmethod
49
  def has_instance():
 
 
 
 
 
 
50
  return TrainingTask.TRAINING_TASK_INST_SINGLETON is not None
51
 
52
  @staticmethod
53
  def get_instance():
 
 
 
 
 
 
 
 
54
  if TrainingTask.TRAINING_TASK_INST_SINGLETON is None:
55
  TrainingTask.TRAINING_TASK_INST_SINGLETON = TrainingTask()
56
 
@@ -58,16 +83,37 @@ class TrainingTask:
58
 
59
  @staticmethod
60
  def clear_instance():
 
 
 
61
  del TrainingTask.TRAINING_TASK_INST_SINGLETON
62
  TrainingTask.TRAINING_TASK_INST_SINGLETON = None
63
 
64
  def has_error(self):
 
 
 
 
 
 
65
  return self.__has_error
66
 
67
  def is_done(self):
 
 
 
 
 
 
68
  return self.__is_done
69
 
70
  def __call__(self, *args, **kwds):
 
 
 
 
 
 
71
  self.__has_error = False
72
  self.__is_done = False
73
 
@@ -83,6 +129,7 @@ class TrainingTask:
83
  self.__load_datasets()
84
  self.__tokenize()
85
  self.__load_model()
 
86
  self.__train()
87
  self.__evaluate()
88
  self.__deploy()
@@ -92,16 +139,25 @@ class TrainingTask:
92
  finally:
93
  self.__is_done = True
94
 
 
 
 
 
 
95
  logout()
96
 
97
  self.__reload_inference_model()
98
 
99
- def __load_datasets(self):
100
- # Load the dataset.
 
 
 
 
101
  dataset = load_dataset(DATASET)
102
 
103
- # Split train/test by an 8/2 ratio.
104
- dataset_train_test = dataset["train"].train_test_split(test_size=0.2)
105
  self.__train_dataset = dataset_train_test["train"]
106
  self.__test_dataset = dataset_train_test["test"]
107
 
@@ -117,6 +173,12 @@ class TrainingTask:
117
  self.__test_dataset = self.__test_dataset.map(label_filter)
118
 
119
  def __tokenize(self):
 
 
 
 
 
 
120
  # Load the tokenizer for the model.
121
  self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
122
 
@@ -138,6 +200,9 @@ class TrainingTask:
138
  self.__test_tokenized = self.__test_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
139
 
140
  def __load_model(self):
 
 
 
141
  # Set the mapping between int label and its meaning.
142
  id2label = {0: "Bearish", 1: "Neutral", 2: "Bullish"}
143
  label2id = {"Bearish": 0, "Neutral": 1, "Bullish": 2}
@@ -150,7 +215,32 @@ class TrainingTask:
150
  id2label=id2label,
151
  )
152
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  def __train(self):
 
 
 
 
 
 
154
  # Define the target optimization metric
155
  metric = evaluate.load("accuracy")
156
 
@@ -161,23 +251,14 @@ class TrainingTask:
161
  return metric.compute(predictions=predictions, references=labels)
162
 
163
  # Checkpoints will be output to this `training_output_dir`.
164
- training_output_dir = "/tmp/sentiment_trainer"
165
  training_args = TrainingArguments(
166
  output_dir=training_output_dir,
167
  eval_strategy="epoch",
168
  per_device_train_batch_size=8,
169
  per_device_eval_batch_size=8,
170
  logging_steps=8,
171
- num_train_epochs=3,
172
- )
173
-
174
- # Instantiate a `Trainer` instance that will be used to initiate a training run.
175
- self.__trainer = Trainer(
176
- model=self.__model,
177
- args=training_args,
178
- train_dataset=self.__train_tokenized,
179
- eval_dataset=self.__test_tokenized,
180
- compute_metrics=compute_metrics,
181
  )
182
 
183
  mlflow.set_tracking_uri(os.environ["MLFLOW_ENDPOINT"])
@@ -185,62 +266,54 @@ class TrainingTask:
185
 
186
  with mlflow.start_run() as run:
187
  self.__run_id = run.info.run_id
188
- self.__trainer.train()
189
-
190
- def __evaluate(self):
191
- tuned_pipeline = pipeline(
192
- task="text-classification",
193
- model=self.__trainer.model,
194
- batch_size=8,
195
- tokenizer=self.__tokenizer,
196
- device="cpu", # or cuda
197
- )
198
-
199
- quick_check = (
200
- "I have a question regarding the project development timeline and allocated resources; "
201
- "specifically, how certain are you that John and Ringo can work together on writing this next song? "
202
- "Do we need to get Paul involved here, or do you truly believe, as you said, 'nah, they got this'?"
203
- )
204
-
205
- result = tuned_pipeline(quick_check)
206
- logger.debug("Test evaluation of fine-tuned model: %s %.6f" % (result[0]["label"], result[0]["score"]))
207
-
208
- # Define a set of parameters that we would like to be able to flexibly override at inference time, along with their default values
209
- model_config = {"batch_size": 8}
210
-
211
- # Infer the model signature, including a representative input, the expected output, and the parameters that we would like to be able to override at inference time.
212
- signature = mlflow.models.infer_signature(
213
- ["This is a test!", "And this is also a test."],
214
- mlflow.transformers.generate_signature_output(
215
- tuned_pipeline, ["This is a test response!", "So is this."]
216
- ),
217
- params=model_config,
218
- )
219
-
220
- # Log the pipeline to the existing training run
221
- with mlflow.start_run(run_id=self.__run_id):
222
- model_info = mlflow.transformers.log_model(
223
- transformers_model=tuned_pipeline,
224
- artifact_path="fine_tuned",
225
- signature=signature,
226
- input_example=["Pass in a string", "And have it mark as spam or not."],
227
- model_config=model_config,
228
- )
229
-
230
- # Load our saved model in the native transformers format
231
- loaded = mlflow.transformers.load_model(model_uri=model_info.model_uri)
232
 
233
- # Define a test example that we expect to be classified as spam
234
- validation_text = (
235
- "Want to learn how to make MILLIONS with no effort? Click HERE now! See for yourself! Guaranteed to make you instantly rich! "
236
- "Don't miss out you could be a winner!"
 
 
 
237
  )
 
238
 
239
- # validate the performance of our fine-tuning
240
- loaded(validation_text)
 
 
 
 
241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
  def __deploy(self):
 
 
 
243
  self.__trainer.push_to_hub(HF_DEST_REPO)
244
 
245
  def __reload_inference_model(self):
 
 
 
246
  infer_task.load_model()
 
10
  pipeline,
11
  )
12
  from huggingface_hub import login, logout
13
+ from scipy.special import softmax
14
 
15
  import os
16
  import mlflow
17
  from tasks.inference import infer_task
18
  from config import is_test_mode
19
+ import time
20
 
 
 
 
 
 
21
 
22
+ # MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
23
+ MODEL = "gpicciuca/twitter-roberta-base-sentiment-latest"
24
  DATASET = "zeroshot/twitter-financial-news-sentiment"
25
  HF_DEST_REPO = "financial-twitter-roberta-sentiment"
26
 
 
28
 
29
  class TrainingTask:
30
 
31
+ """
32
+ Implements a sequence of actions to control the training phase of the model.
33
+ The class implements a callable overload method which initializes the old model,
34
+ loads and prepares datasets and proceeds with the training.
35
+ Upon completion, the new model will be uploaded to the HuggingFace repo only
36
+ if its accuracy did not drop compared to the old model.
37
+
38
+ This class is managed via singleton so that there may only be one
39
+ instance at any time, unless manually allocated.
40
+ """
41
+
42
  TRAINING_TASK_INST_SINGLETON = None
43
 
44
  def __init__(self):
 
54
  self.__trainer = None
55
  self.__run_id = None
56
 
57
+ self.__old_accuracy = 0.0
58
+
59
  @staticmethod
60
  def has_instance():
61
+ """
62
+ Checks if a global singleton instance is available
63
+
64
+ Returns:
65
+ bool: True if instance available, false otherwise
66
+ """
67
  return TrainingTask.TRAINING_TASK_INST_SINGLETON is not None
68
 
69
  @staticmethod
70
  def get_instance():
71
+ """
72
+ Returns the globally allocated singleton instance.
73
+ Instance will be allocated with this method if none was previously
74
+ allocated yet.
75
+
76
+ Returns:
77
+ TrainingTask: Singleton instance
78
+ """
79
  if TrainingTask.TRAINING_TASK_INST_SINGLETON is None:
80
  TrainingTask.TRAINING_TASK_INST_SINGLETON = TrainingTask()
81
 
 
83
 
84
  @staticmethod
85
  def clear_instance():
86
+ """
87
+ Destroys the global instance
88
+ """
89
  del TrainingTask.TRAINING_TASK_INST_SINGLETON
90
  TrainingTask.TRAINING_TASK_INST_SINGLETON = None
91
 
92
  def has_error(self):
93
+ """
94
+ Checks whether an error occurred during training.
95
+
96
+ Returns:
97
+ bool: True if an exception was raised, false otherwise
98
+ """
99
  return self.__has_error
100
 
101
  def is_done(self):
102
+ """
103
+ Checks whether the training is done.
104
+
105
+ Returns:
106
+ bool: True if done, false if still ongoing.
107
+ """
108
  return self.__is_done
109
 
110
  def __call__(self, *args, **kwds):
111
+ """
112
+ Callable overload for this class. Initiates the training sequence
113
+ for the existing model by loading it, loading and preparing datasets,
114
+ fine-tuning and comparing performance against old model over the test dataset.
115
+ """
116
+
117
  self.__has_error = False
118
  self.__is_done = False
119
 
 
129
  self.__load_datasets()
130
  self.__tokenize()
131
  self.__load_model()
132
+ self.__check_old_accuracy()
133
  self.__train()
134
  self.__evaluate()
135
  self.__deploy()
 
139
  finally:
140
  self.__is_done = True
141
 
142
+ if self.has_error():
143
+ logger.error("Training did not complete and terminated with an error")
144
+ else:
145
+ logger.info("Training completed")
146
+
147
  logout()
148
 
149
  self.__reload_inference_model()
150
 
151
+ def __load_datasets(self, test_size_ratio=0.2):
152
+ """
153
+ Loads and splits the dataset in train and test sets.
154
+ """
155
+ assert (test_size_ratio > 0.0 and test_size_ratio < 1.0)
156
+
157
  dataset = load_dataset(DATASET)
158
 
159
+ # Split train/test by 'test_size_ratio'
160
+ dataset_train_test = dataset["train"].train_test_split(test_size=test_size_ratio)
161
  self.__train_dataset = dataset_train_test["train"]
162
  self.__test_dataset = dataset_train_test["test"]
163
 
 
173
  self.__test_dataset = self.__test_dataset.map(label_filter)
174
 
175
  def __tokenize(self):
176
+ """
177
+ Loads the tokenizer previously used in the pretrained model
178
+ and uses it to tokenize the datasets so that the input to the
179
+ model remains consistent with what it has seen in previous
180
+ trainings.
181
+ """
182
  # Load the tokenizer for the model.
183
  self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
184
 
 
200
  self.__test_tokenized = self.__test_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
201
 
202
  def __load_model(self):
203
+ """
204
+ Loads the model from the repository
205
+ """
206
  # Set the mapping between int label and its meaning.
207
  id2label = {0: "Bearish", 1: "Neutral", 2: "Bullish"}
208
  label2id = {"Bearish": 0, "Neutral": 1, "Bullish": 2}
 
215
  id2label=id2label,
216
  )
217
 
218
+ def __check_old_accuracy(self):
219
+ """
220
+ Run a prediction with the old model on the tokenized test dataset
221
+ to evaluate the model's accuracy.
222
+ """
223
+ trainer = Trainer(model=self.__model, tokenizer=self.__tokenizer)
224
+ output = trainer.predict(self.__test_tokenized)
225
+
226
+ # Get logits from the prediction output.
227
+ logits = output.predictions
228
+ # Convert logits to predicted class labels.
229
+ preds = np.argmax(logits, axis=1)
230
+ # Get the true labels.
231
+ labels = output.label_ids
232
+
233
+ # Compute accuracy.
234
+ self.__old_accuracy = (preds == labels).mean()
235
+ logger.info(f"Old model accuracy: {self.__old_accuracy:.4f}")
236
+
237
  def __train(self):
238
+ """
239
+ Performs the training/fine-tuning of the loaded model using the
240
+ tokenized train and test datasets.
241
+ The training run will be logged on the MLFlow Dashboard.
242
+ Uses the 'accuracy' metric to evaluate performance.
243
+ """
244
  # Define the target optimization metric
245
  metric = evaluate.load("accuracy")
246
 
 
251
  return metric.compute(predictions=predictions, references=labels)
252
 
253
  # Checkpoints will be output to this `training_output_dir`.
254
+ training_output_dir = "./training_output"
255
  training_args = TrainingArguments(
256
  output_dir=training_output_dir,
257
  eval_strategy="epoch",
258
  per_device_train_batch_size=8,
259
  per_device_eval_batch_size=8,
260
  logging_steps=8,
261
+ num_train_epochs=10,
 
 
 
 
 
 
 
 
 
262
  )
263
 
264
  mlflow.set_tracking_uri(os.environ["MLFLOW_ENDPOINT"])
 
266
 
267
  with mlflow.start_run() as run:
268
  self.__run_id = run.info.run_id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
269
 
270
+ logger.info("Initializing trainer...")
271
+ self.__trainer = Trainer(
272
+ model=self.__model,
273
+ args=training_args,
274
+ train_dataset=self.__train_tokenized,
275
+ eval_dataset=self.__test_tokenized,
276
+ compute_metrics=compute_metrics,
277
  )
278
+ logger.info("Trainer finished")
279
 
280
+ def __evaluate(self):
281
+ """
282
+ Evaluates the fine-tuned model's performance by comparing the new
283
+ accuracy with the old one over the same test dataset.
284
+ """
285
+ logger.info("Evaluating new model's performance")
286
 
287
+ with mlflow.start_run(run_id=self.__run_id):
288
+ output = self.__trainer.predict(self.__test_tokenized)
289
+
290
+ # Get logits from the prediction output.
291
+ logits = output.predictions
292
+ # Convert logits to predicted class labels.
293
+ preds = np.argmax(logits, axis=1)
294
+ # Get the true labels.
295
+ labels = output.label_ids
296
+
297
+ # Compute accuracy.
298
+ new_accuracy = (preds == labels).mean()
299
+ mlflow.log_metrics({
300
+ "old_accuracy": self.__old_accuracy,
301
+ "new_accuracy": new_accuracy
302
+ }, step=int(time.time()))
303
+
304
+ if self.__old_accuracy > new_accuracy:
305
+ raise Exception(f"New trained model's accuracy dropped {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
306
+ else:
307
+ logger.info(f"New trained model's accuracy {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
308
+
309
  def __deploy(self):
310
+ """
311
+ Uploads the fine-tuned model to HuggingFace
312
+ """
313
  self.__trainer.push_to_hub(HF_DEST_REPO)
314
 
315
  def __reload_inference_model(self):
316
+ """
317
+ Reloads the model used by the Inference class.
318
+ """
319
  infer_task.load_model()
docker-compose.yaml CHANGED
@@ -1,5 +1,4 @@
1
  services:
2
- #
3
  model_runner:
4
  build:
5
  context: .
@@ -19,6 +18,13 @@ services:
19
  entrypoint: ["/usr/bin/python3", "/app/main.py"]
20
  networks:
21
  - airflow_tracking_network
 
 
 
 
 
 
 
22
 
23
  networks:
24
  airflow_tracking_network:
 
1
  services:
 
2
  model_runner:
3
  build:
4
  context: .
 
18
  entrypoint: ["/usr/bin/python3", "/app/main.py"]
19
  networks:
20
  - airflow_tracking_network
21
+ deploy:
22
+ resources:
23
+ reservations:
24
+ devices:
25
+ - driver: nvidia
26
+ count: all
27
+ capabilities: [gpu]
28
 
29
  networks:
30
  airflow_tracking_network: