Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright 2022 The Music Spectrogram Diffusion Authors. | |
| # Copyright 2023 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import dataclasses | |
| import math | |
| import os | |
| from typing import Any, Callable, List, Mapping, MutableMapping, Optional, Sequence, Tuple, Union | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| from ....utils import is_note_seq_available | |
| from .pipeline_spectrogram_diffusion import TARGET_FEATURE_LENGTH | |
| if is_note_seq_available(): | |
| import note_seq | |
| else: | |
| raise ImportError("Please install note-seq via `pip install note-seq`") | |
| INPUT_FEATURE_LENGTH = 2048 | |
| SAMPLE_RATE = 16000 | |
| HOP_SIZE = 320 | |
| FRAME_RATE = int(SAMPLE_RATE // HOP_SIZE) | |
| DEFAULT_STEPS_PER_SECOND = 100 | |
| DEFAULT_MAX_SHIFT_SECONDS = 10 | |
| DEFAULT_NUM_VELOCITY_BINS = 1 | |
| SLAKH_CLASS_PROGRAMS = { | |
| "Acoustic Piano": 0, | |
| "Electric Piano": 4, | |
| "Chromatic Percussion": 8, | |
| "Organ": 16, | |
| "Acoustic Guitar": 24, | |
| "Clean Electric Guitar": 26, | |
| "Distorted Electric Guitar": 29, | |
| "Acoustic Bass": 32, | |
| "Electric Bass": 33, | |
| "Violin": 40, | |
| "Viola": 41, | |
| "Cello": 42, | |
| "Contrabass": 43, | |
| "Orchestral Harp": 46, | |
| "Timpani": 47, | |
| "String Ensemble": 48, | |
| "Synth Strings": 50, | |
| "Choir and Voice": 52, | |
| "Orchestral Hit": 55, | |
| "Trumpet": 56, | |
| "Trombone": 57, | |
| "Tuba": 58, | |
| "French Horn": 60, | |
| "Brass Section": 61, | |
| "Soprano/Alto Sax": 64, | |
| "Tenor Sax": 66, | |
| "Baritone Sax": 67, | |
| "Oboe": 68, | |
| "English Horn": 69, | |
| "Bassoon": 70, | |
| "Clarinet": 71, | |
| "Pipe": 73, | |
| "Synth Lead": 80, | |
| "Synth Pad": 88, | |
| } | |
| class NoteRepresentationConfig: | |
| """Configuration note representations.""" | |
| onsets_only: bool | |
| include_ties: bool | |
| class NoteEventData: | |
| pitch: int | |
| velocity: Optional[int] = None | |
| program: Optional[int] = None | |
| is_drum: Optional[bool] = None | |
| instrument: Optional[int] = None | |
| class NoteEncodingState: | |
| """Encoding state for note transcription, keeping track of active pitches.""" | |
| # velocity bin for active pitches and programs | |
| active_pitches: MutableMapping[Tuple[int, int], int] = dataclasses.field(default_factory=dict) | |
| class EventRange: | |
| type: str | |
| min_value: int | |
| max_value: int | |
| class Event: | |
| type: str | |
| value: int | |
| class Tokenizer: | |
| def __init__(self, regular_ids: int): | |
| # The special tokens: 0=PAD, 1=EOS, and 2=UNK | |
| self._num_special_tokens = 3 | |
| self._num_regular_tokens = regular_ids | |
| def encode(self, token_ids): | |
| encoded = [] | |
| for token_id in token_ids: | |
| if not 0 <= token_id < self._num_regular_tokens: | |
| raise ValueError( | |
| f"token_id {token_id} does not fall within valid range of [0, {self._num_regular_tokens})" | |
| ) | |
| encoded.append(token_id + self._num_special_tokens) | |
| # Add EOS token | |
| encoded.append(1) | |
| # Pad to till INPUT_FEATURE_LENGTH | |
| encoded = encoded + [0] * (INPUT_FEATURE_LENGTH - len(encoded)) | |
| return encoded | |
| class Codec: | |
| """Encode and decode events. | |
| Useful for declaring what certain ranges of a vocabulary should be used for. This is intended to be used from | |
| Python before encoding or after decoding with GenericTokenVocabulary. This class is more lightweight and does not | |
| include things like EOS or UNK token handling. | |
| To ensure that 'shift' events are always the first block of the vocab and start at 0, that event type is required | |
| and specified separately. | |
| """ | |
| def __init__(self, max_shift_steps: int, steps_per_second: float, event_ranges: List[EventRange]): | |
| """Define Codec. | |
| Args: | |
| max_shift_steps: Maximum number of shift steps that can be encoded. | |
| steps_per_second: Shift steps will be interpreted as having a duration of | |
| 1 / steps_per_second. | |
| event_ranges: Other supported event types and their ranges. | |
| """ | |
| self.steps_per_second = steps_per_second | |
| self._shift_range = EventRange(type="shift", min_value=0, max_value=max_shift_steps) | |
| self._event_ranges = [self._shift_range] + event_ranges | |
| # Ensure all event types have unique names. | |
| assert len(self._event_ranges) == len({er.type for er in self._event_ranges}) | |
| def num_classes(self) -> int: | |
| return sum(er.max_value - er.min_value + 1 for er in self._event_ranges) | |
| # The next couple methods are simplified special case methods just for shift | |
| # events that are intended to be used from within autograph functions. | |
| def is_shift_event_index(self, index: int) -> bool: | |
| return (self._shift_range.min_value <= index) and (index <= self._shift_range.max_value) | |
| def max_shift_steps(self) -> int: | |
| return self._shift_range.max_value | |
| def encode_event(self, event: Event) -> int: | |
| """Encode an event to an index.""" | |
| offset = 0 | |
| for er in self._event_ranges: | |
| if event.type == er.type: | |
| if not er.min_value <= event.value <= er.max_value: | |
| raise ValueError( | |
| f"Event value {event.value} is not within valid range " | |
| f"[{er.min_value}, {er.max_value}] for type {event.type}" | |
| ) | |
| return offset + event.value - er.min_value | |
| offset += er.max_value - er.min_value + 1 | |
| raise ValueError(f"Unknown event type: {event.type}") | |
| def event_type_range(self, event_type: str) -> Tuple[int, int]: | |
| """Return [min_id, max_id] for an event type.""" | |
| offset = 0 | |
| for er in self._event_ranges: | |
| if event_type == er.type: | |
| return offset, offset + (er.max_value - er.min_value) | |
| offset += er.max_value - er.min_value + 1 | |
| raise ValueError(f"Unknown event type: {event_type}") | |
| def decode_event_index(self, index: int) -> Event: | |
| """Decode an event index to an Event.""" | |
| offset = 0 | |
| for er in self._event_ranges: | |
| if offset <= index <= offset + er.max_value - er.min_value: | |
| return Event(type=er.type, value=er.min_value + index - offset) | |
| offset += er.max_value - er.min_value + 1 | |
| raise ValueError(f"Unknown event index: {index}") | |
| class ProgramGranularity: | |
| # both tokens_map_fn and program_map_fn should be idempotent | |
| tokens_map_fn: Callable[[Sequence[int], Codec], Sequence[int]] | |
| program_map_fn: Callable[[int], int] | |
| def drop_programs(tokens, codec: Codec): | |
| """Drops program change events from a token sequence.""" | |
| min_program_id, max_program_id = codec.event_type_range("program") | |
| return tokens[(tokens < min_program_id) | (tokens > max_program_id)] | |
| def programs_to_midi_classes(tokens, codec): | |
| """Modifies program events to be the first program in the MIDI class.""" | |
| min_program_id, max_program_id = codec.event_type_range("program") | |
| is_program = (tokens >= min_program_id) & (tokens <= max_program_id) | |
| return np.where(is_program, min_program_id + 8 * ((tokens - min_program_id) // 8), tokens) | |
| PROGRAM_GRANULARITIES = { | |
| # "flat" granularity; drop program change tokens and set NoteSequence | |
| # programs to zero | |
| "flat": ProgramGranularity(tokens_map_fn=drop_programs, program_map_fn=lambda program: 0), | |
| # map each program to the first program in its MIDI class | |
| "midi_class": ProgramGranularity( | |
| tokens_map_fn=programs_to_midi_classes, program_map_fn=lambda program: 8 * (program // 8) | |
| ), | |
| # leave programs as is | |
| "full": ProgramGranularity(tokens_map_fn=lambda tokens, codec: tokens, program_map_fn=lambda program: program), | |
| } | |
| def frame(signal, frame_length, frame_step, pad_end=False, pad_value=0, axis=-1): | |
| """ | |
| equivalent of tf.signal.frame | |
| """ | |
| signal_length = signal.shape[axis] | |
| if pad_end: | |
| frames_overlap = frame_length - frame_step | |
| rest_samples = np.abs(signal_length - frames_overlap) % np.abs(frame_length - frames_overlap) | |
| pad_size = int(frame_length - rest_samples) | |
| if pad_size != 0: | |
| pad_axis = [0] * signal.ndim | |
| pad_axis[axis] = pad_size | |
| signal = F.pad(signal, pad_axis, "constant", pad_value) | |
| frames = signal.unfold(axis, frame_length, frame_step) | |
| return frames | |
| def program_to_slakh_program(program): | |
| # this is done very hackily, probably should use a custom mapping | |
| for slakh_program in sorted(SLAKH_CLASS_PROGRAMS.values(), reverse=True): | |
| if program >= slakh_program: | |
| return slakh_program | |
| def audio_to_frames( | |
| samples, | |
| hop_size: int, | |
| frame_rate: int, | |
| ) -> Tuple[Sequence[Sequence[int]], torch.Tensor]: | |
| """Convert audio samples to non-overlapping frames and frame times.""" | |
| frame_size = hop_size | |
| samples = np.pad(samples, [0, frame_size - len(samples) % frame_size], mode="constant") | |
| # Split audio into frames. | |
| frames = frame( | |
| torch.Tensor(samples).unsqueeze(0), | |
| frame_length=frame_size, | |
| frame_step=frame_size, | |
| pad_end=False, # TODO check why its off by 1 here when True | |
| ) | |
| num_frames = len(samples) // frame_size | |
| times = np.arange(num_frames) / frame_rate | |
| return frames, times | |
| def note_sequence_to_onsets_and_offsets_and_programs( | |
| ns: note_seq.NoteSequence, | |
| ) -> Tuple[Sequence[float], Sequence[NoteEventData]]: | |
| """Extract onset & offset times and pitches & programs from a NoteSequence. | |
| The onset & offset times will not necessarily be in sorted order. | |
| Args: | |
| ns: NoteSequence from which to extract onsets and offsets. | |
| Returns: | |
| times: A list of note onset and offset times. values: A list of NoteEventData objects where velocity is zero for | |
| note | |
| offsets. | |
| """ | |
| # Sort by program and pitch and put offsets before onsets as a tiebreaker for | |
| # subsequent stable sort. | |
| notes = sorted(ns.notes, key=lambda note: (note.is_drum, note.program, note.pitch)) | |
| times = [note.end_time for note in notes if not note.is_drum] + [note.start_time for note in notes] | |
| values = [ | |
| NoteEventData(pitch=note.pitch, velocity=0, program=note.program, is_drum=False) | |
| for note in notes | |
| if not note.is_drum | |
| ] + [ | |
| NoteEventData(pitch=note.pitch, velocity=note.velocity, program=note.program, is_drum=note.is_drum) | |
| for note in notes | |
| ] | |
| return times, values | |
| def num_velocity_bins_from_codec(codec: Codec): | |
| """Get number of velocity bins from event codec.""" | |
| lo, hi = codec.event_type_range("velocity") | |
| return hi - lo | |
| # segment an array into segments of length n | |
| def segment(a, n): | |
| return [a[i : i + n] for i in range(0, len(a), n)] | |
| def velocity_to_bin(velocity, num_velocity_bins): | |
| if velocity == 0: | |
| return 0 | |
| else: | |
| return math.ceil(num_velocity_bins * velocity / note_seq.MAX_MIDI_VELOCITY) | |
| def note_event_data_to_events( | |
| state: Optional[NoteEncodingState], | |
| value: NoteEventData, | |
| codec: Codec, | |
| ) -> Sequence[Event]: | |
| """Convert note event data to a sequence of events.""" | |
| if value.velocity is None: | |
| # onsets only, no program or velocity | |
| return [Event("pitch", value.pitch)] | |
| else: | |
| num_velocity_bins = num_velocity_bins_from_codec(codec) | |
| velocity_bin = velocity_to_bin(value.velocity, num_velocity_bins) | |
| if value.program is None: | |
| # onsets + offsets + velocities only, no programs | |
| if state is not None: | |
| state.active_pitches[(value.pitch, 0)] = velocity_bin | |
| return [Event("velocity", velocity_bin), Event("pitch", value.pitch)] | |
| else: | |
| if value.is_drum: | |
| # drum events use a separate vocabulary | |
| return [Event("velocity", velocity_bin), Event("drum", value.pitch)] | |
| else: | |
| # program + velocity + pitch | |
| if state is not None: | |
| state.active_pitches[(value.pitch, value.program)] = velocity_bin | |
| return [ | |
| Event("program", value.program), | |
| Event("velocity", velocity_bin), | |
| Event("pitch", value.pitch), | |
| ] | |
| def note_encoding_state_to_events(state: NoteEncodingState) -> Sequence[Event]: | |
| """Output program and pitch events for active notes plus a final tie event.""" | |
| events = [] | |
| for pitch, program in sorted(state.active_pitches.keys(), key=lambda k: k[::-1]): | |
| if state.active_pitches[(pitch, program)]: | |
| events += [Event("program", program), Event("pitch", pitch)] | |
| events.append(Event("tie", 0)) | |
| return events | |
| def encode_and_index_events( | |
| state, event_times, event_values, codec, frame_times, encode_event_fn, encoding_state_to_events_fn=None | |
| ): | |
| """Encode a sequence of timed events and index to audio frame times. | |
| Encodes time shifts as repeated single step shifts for later run length encoding. | |
| Optionally, also encodes a sequence of "state events", keeping track of the current encoding state at each audio | |
| frame. This can be used e.g. to prepend events representing the current state to a targets segment. | |
| Args: | |
| state: Initial event encoding state. | |
| event_times: Sequence of event times. | |
| event_values: Sequence of event values. | |
| encode_event_fn: Function that transforms event value into a sequence of one | |
| or more Event objects. | |
| codec: An Codec object that maps Event objects to indices. | |
| frame_times: Time for every audio frame. | |
| encoding_state_to_events_fn: Function that transforms encoding state into a | |
| sequence of one or more Event objects. | |
| Returns: | |
| events: Encoded events and shifts. event_start_indices: Corresponding start event index for every audio frame. | |
| Note: one event can correspond to multiple audio indices due to sampling rate differences. This makes | |
| splitting sequences tricky because the same event can appear at the end of one sequence and the beginning of | |
| another. | |
| event_end_indices: Corresponding end event index for every audio frame. Used | |
| to ensure when slicing that one chunk ends where the next begins. Should always be true that | |
| event_end_indices[i] = event_start_indices[i + 1]. | |
| state_events: Encoded "state" events representing the encoding state before | |
| each event. | |
| state_event_indices: Corresponding state event index for every audio frame. | |
| """ | |
| indices = np.argsort(event_times, kind="stable") | |
| event_steps = [round(event_times[i] * codec.steps_per_second) for i in indices] | |
| event_values = [event_values[i] for i in indices] | |
| events = [] | |
| state_events = [] | |
| event_start_indices = [] | |
| state_event_indices = [] | |
| cur_step = 0 | |
| cur_event_idx = 0 | |
| cur_state_event_idx = 0 | |
| def fill_event_start_indices_to_cur_step(): | |
| while ( | |
| len(event_start_indices) < len(frame_times) | |
| and frame_times[len(event_start_indices)] < cur_step / codec.steps_per_second | |
| ): | |
| event_start_indices.append(cur_event_idx) | |
| state_event_indices.append(cur_state_event_idx) | |
| for event_step, event_value in zip(event_steps, event_values): | |
| while event_step > cur_step: | |
| events.append(codec.encode_event(Event(type="shift", value=1))) | |
| cur_step += 1 | |
| fill_event_start_indices_to_cur_step() | |
| cur_event_idx = len(events) | |
| cur_state_event_idx = len(state_events) | |
| if encoding_state_to_events_fn: | |
| # Dump state to state events *before* processing the next event, because | |
| # we want to capture the state prior to the occurrence of the event. | |
| for e in encoding_state_to_events_fn(state): | |
| state_events.append(codec.encode_event(e)) | |
| for e in encode_event_fn(state, event_value, codec): | |
| events.append(codec.encode_event(e)) | |
| # After the last event, continue filling out the event_start_indices array. | |
| # The inequality is not strict because if our current step lines up exactly | |
| # with (the start of) an audio frame, we need to add an additional shift event | |
| # to "cover" that frame. | |
| while cur_step / codec.steps_per_second <= frame_times[-1]: | |
| events.append(codec.encode_event(Event(type="shift", value=1))) | |
| cur_step += 1 | |
| fill_event_start_indices_to_cur_step() | |
| cur_event_idx = len(events) | |
| # Now fill in event_end_indices. We need this extra array to make sure that | |
| # when we slice events, each slice ends exactly where the subsequent slice | |
| # begins. | |
| event_end_indices = event_start_indices[1:] + [len(events)] | |
| events = np.array(events).astype(np.int32) | |
| state_events = np.array(state_events).astype(np.int32) | |
| event_start_indices = segment(np.array(event_start_indices).astype(np.int32), TARGET_FEATURE_LENGTH) | |
| event_end_indices = segment(np.array(event_end_indices).astype(np.int32), TARGET_FEATURE_LENGTH) | |
| state_event_indices = segment(np.array(state_event_indices).astype(np.int32), TARGET_FEATURE_LENGTH) | |
| outputs = [] | |
| for start_indices, end_indices, event_indices in zip(event_start_indices, event_end_indices, state_event_indices): | |
| outputs.append( | |
| { | |
| "inputs": events, | |
| "event_start_indices": start_indices, | |
| "event_end_indices": end_indices, | |
| "state_events": state_events, | |
| "state_event_indices": event_indices, | |
| } | |
| ) | |
| return outputs | |
| def extract_sequence_with_indices(features, state_events_end_token=None, feature_key="inputs"): | |
| """Extract target sequence corresponding to audio token segment.""" | |
| features = features.copy() | |
| start_idx = features["event_start_indices"][0] | |
| end_idx = features["event_end_indices"][-1] | |
| features[feature_key] = features[feature_key][start_idx:end_idx] | |
| if state_events_end_token is not None: | |
| # Extract the state events corresponding to the audio start token, and | |
| # prepend them to the targets array. | |
| state_event_start_idx = features["state_event_indices"][0] | |
| state_event_end_idx = state_event_start_idx + 1 | |
| while features["state_events"][state_event_end_idx - 1] != state_events_end_token: | |
| state_event_end_idx += 1 | |
| features[feature_key] = np.concatenate( | |
| [ | |
| features["state_events"][state_event_start_idx:state_event_end_idx], | |
| features[feature_key], | |
| ], | |
| axis=0, | |
| ) | |
| return features | |
| def map_midi_programs( | |
| feature, codec: Codec, granularity_type: str = "full", feature_key: str = "inputs" | |
| ) -> Mapping[str, Any]: | |
| """Apply MIDI program map to token sequences.""" | |
| granularity = PROGRAM_GRANULARITIES[granularity_type] | |
| feature[feature_key] = granularity.tokens_map_fn(feature[feature_key], codec) | |
| return feature | |
| def run_length_encode_shifts_fn( | |
| features, | |
| codec: Codec, | |
| feature_key: str = "inputs", | |
| state_change_event_types: Sequence[str] = (), | |
| ) -> Callable[[Mapping[str, Any]], Mapping[str, Any]]: | |
| """Return a function that run-length encodes shifts for a given codec. | |
| Args: | |
| codec: The Codec to use for shift events. | |
| feature_key: The feature key for which to run-length encode shifts. | |
| state_change_event_types: A list of event types that represent state | |
| changes; tokens corresponding to these event types will be interpreted as state changes and redundant ones | |
| will be removed. | |
| Returns: | |
| A preprocessing function that run-length encodes single-step shifts. | |
| """ | |
| state_change_event_ranges = [codec.event_type_range(event_type) for event_type in state_change_event_types] | |
| def run_length_encode_shifts(features: MutableMapping[str, Any]) -> Mapping[str, Any]: | |
| """Combine leading/interior shifts, trim trailing shifts. | |
| Args: | |
| features: Dict of features to process. | |
| Returns: | |
| A dict of features. | |
| """ | |
| events = features[feature_key] | |
| shift_steps = 0 | |
| total_shift_steps = 0 | |
| output = np.array([], dtype=np.int32) | |
| current_state = np.zeros(len(state_change_event_ranges), dtype=np.int32) | |
| for event in events: | |
| if codec.is_shift_event_index(event): | |
| shift_steps += 1 | |
| total_shift_steps += 1 | |
| else: | |
| # If this event is a state change and has the same value as the current | |
| # state, we can skip it entirely. | |
| is_redundant = False | |
| for i, (min_index, max_index) in enumerate(state_change_event_ranges): | |
| if (min_index <= event) and (event <= max_index): | |
| if current_state[i] == event: | |
| is_redundant = True | |
| current_state[i] = event | |
| if is_redundant: | |
| continue | |
| # Once we've reached a non-shift event, RLE all previous shift events | |
| # before outputting the non-shift event. | |
| if shift_steps > 0: | |
| shift_steps = total_shift_steps | |
| while shift_steps > 0: | |
| output_steps = np.minimum(codec.max_shift_steps, shift_steps) | |
| output = np.concatenate([output, [output_steps]], axis=0) | |
| shift_steps -= output_steps | |
| output = np.concatenate([output, [event]], axis=0) | |
| features[feature_key] = output | |
| return features | |
| return run_length_encode_shifts(features) | |
| def note_representation_processor_chain(features, codec: Codec, note_representation_config: NoteRepresentationConfig): | |
| tie_token = codec.encode_event(Event("tie", 0)) | |
| state_events_end_token = tie_token if note_representation_config.include_ties else None | |
| features = extract_sequence_with_indices( | |
| features, state_events_end_token=state_events_end_token, feature_key="inputs" | |
| ) | |
| features = map_midi_programs(features, codec) | |
| features = run_length_encode_shifts_fn(features, codec, state_change_event_types=["velocity", "program"]) | |
| return features | |
| class MidiProcessor: | |
| def __init__(self): | |
| self.codec = Codec( | |
| max_shift_steps=DEFAULT_MAX_SHIFT_SECONDS * DEFAULT_STEPS_PER_SECOND, | |
| steps_per_second=DEFAULT_STEPS_PER_SECOND, | |
| event_ranges=[ | |
| EventRange("pitch", note_seq.MIN_MIDI_PITCH, note_seq.MAX_MIDI_PITCH), | |
| EventRange("velocity", 0, DEFAULT_NUM_VELOCITY_BINS), | |
| EventRange("tie", 0, 0), | |
| EventRange("program", note_seq.MIN_MIDI_PROGRAM, note_seq.MAX_MIDI_PROGRAM), | |
| EventRange("drum", note_seq.MIN_MIDI_PITCH, note_seq.MAX_MIDI_PITCH), | |
| ], | |
| ) | |
| self.tokenizer = Tokenizer(self.codec.num_classes) | |
| self.note_representation_config = NoteRepresentationConfig(onsets_only=False, include_ties=True) | |
| def __call__(self, midi: Union[bytes, os.PathLike, str]): | |
| if not isinstance(midi, bytes): | |
| with open(midi, "rb") as f: | |
| midi = f.read() | |
| ns = note_seq.midi_to_note_sequence(midi) | |
| ns_sus = note_seq.apply_sustain_control_changes(ns) | |
| for note in ns_sus.notes: | |
| if not note.is_drum: | |
| note.program = program_to_slakh_program(note.program) | |
| samples = np.zeros(int(ns_sus.total_time * SAMPLE_RATE)) | |
| _, frame_times = audio_to_frames(samples, HOP_SIZE, FRAME_RATE) | |
| times, values = note_sequence_to_onsets_and_offsets_and_programs(ns_sus) | |
| events = encode_and_index_events( | |
| state=NoteEncodingState(), | |
| event_times=times, | |
| event_values=values, | |
| frame_times=frame_times, | |
| codec=self.codec, | |
| encode_event_fn=note_event_data_to_events, | |
| encoding_state_to_events_fn=note_encoding_state_to_events, | |
| ) | |
| events = [ | |
| note_representation_processor_chain(event, self.codec, self.note_representation_config) for event in events | |
| ] | |
| input_tokens = [self.tokenizer.encode(event["inputs"]) for event in events] | |
| return input_tokens | |