{ "cells": [ { "cell_type": "markdown", "id": "a827ed8c", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "5c9168a9e4a427b3e6ce3cd7d6474001", "grade": false, "grade_id": "cell-6a941dadcabee4ce", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "# Exercise Sheet No. 9\n", "\n", "---\n", "\n", "> Machine Learning for Natural Sciences, Summer 2024, TT.-Prof. Pascal Friederich, pascal.friederich@kit.edu\n", "> \n", "> Deadline: Jun 25th 2024, 8:00 am\n", ">\n", "> Tutor: jonas.teufel@kit.edu\n", ">\n", "> **Please ask questions in the forum/discussion board and only contact the Tutor when there are issues with the grading**\n", "---\n", "\n", "**Topic**: This exercise sheet will focus on recurrent neural networks such as GRUs and LSTMs. For practical applications, we'll be looking at the classification of ECG data and text sentiment classification." ] }, { "cell_type": "markdown", "id": "d1ed7b07", "metadata": { "deletable": false, "nbgrader": { "cell_type": "markdown", "checksum": "a879c1dc172b7295c35c8e5d74a122c1", "grade": true, "grade_id": "name-entry", "locked": false, "points": 0, "schema_version": 3, "solution": true, "task": false } }, "source": [ "Please add here your group members' names and student IDs. \n", "\n", "Names: Nils Lennart Bruns\n", "\n", "IDs: 2460137" ] }, { "cell_type": "markdown", "id": "0d79f4d6", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "825a5716d90ba2160202d4c3bc57e271", "grade": false, "grade_id": "cell-75db4b3e225be105", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "# 9 Recurrent Neural Networks\n", "\n", "\n", "## 9.1 Working with Time-Series Data\n", "\n", "**Normal Tabular Data.** With neural network models, we are usually working with relatively strictly defined data - at least regarding its data format. More specifically, when thinking about the data from the perspective of numeric arrays and matrices, we need to impose strict assumptions about the *shape* of this data. For a dense neural network that predicts tabular data, for example, the input vector always needs to have the same number and order of elements. For convolutional neural networks predicing based on image data, we need to make sure that the input images always have the same dimensions.\n", "\n", "**Sequential Data.** For the class of data known as *sequential* data, strict assumptions about the shape are no longer possible for at least one of the data dimensions. One particularly illustrative example for sequential data is *text*. If we want to use neural networks do classification on textual data, we run into the problem that text generally has different length. One often used example in the domain text processing is the problem of [sentiment classification](https://en.wikipedia.org/wiki/Sentiment_analysis): In the most simple case, we could use a model to predict whether a given has a positive (*\"I really liked this book!\"*) or a negative (*Worst movie I've ever seen!*) meaning. In a simplified manner we can encode text snippets as sequences of characters, where each character could be represented as a [one-hot encoded](https://en.wikipedia.org/wiki/One-hot) vector, for example.\n", "\n", "In other domains, sequential data can be found whenever some measurments are recorded are recorded over periods of time. This, for example, includes measurements of seismic activities or climate data records that can later be used to perform analyses or predict forecasts of future observations. \n", "\n", "**Recurrent Neural Networks.** Due to the particular properties of sequential data, the special type of *recurrent neural networks (RNNs)* is required to handle the dynamic input shapes. These special network architectures treat the sequence dimension of the input data in a special manner to allow for different lengths. The common method of treating this dynamic sequence dimension is following: Instead of having dedicated parameters for the sequence dimension, the same existing network layers are applied for each time step independently. The network is able to maintain information about the whole sequence by accumulating in some kind of internal state representation that is passed shared between adjacent time steps." ] }, { "cell_type": "code", "execution_count": 4, "id": "2bb9f0cd", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "0910d6d5c3a1e2091ced89b25a5c3aa0", "grade": false, "grade_id": "cell-717cee8c7f8ad17f", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "import os\n", "import io\n", "import re\n", "import sys\n", "import time\n", "import random\n", "import hashlib\n", "import zipfile\n", "import tempfile\n", "import typing as typ\n", "from collections import defaultdict\n", "from copy import copy\n", "\n", "import requests\n", "import pandas as pd\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from sklearn.metrics import accuracy_score\n", "\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.nn.utils.rnn as rnn\n", "import torch.optim as optim\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 5, "id": "28d23044", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "5c869e94a6b1c0c8e97c01966defc791", "grade": false, "grade_id": "cell-9383204f5fb9dd9d", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "hashcheck = lambda v: hashlib.sha256(v.encode()).hexdigest()[:16]\n", "\n", "def sigmoid(x: np.ndarray) -> np.ndarray:\n", " if isinstance(x, np.ndarray):\n", " x = x.astype(float)\n", " \n", " return 1.0 / (1.0 + np.exp(-x))\n", "\n", "np.sigmoid = sigmoid\n", "\n", "\n", "def nextcloud_download(url: str, raw: bool = False) -> str:\n", " \"\"\"\n", " Downloads the content of a file from a nextcloud server and returns \n", " it eithers as a string or a bytes object if the ``raw`` flag is set.\n", " \"\"\"\n", " response = requests.get(f'{url}/download')\n", " content = response.content\n", " if not raw:\n", " content = content.decode('utf-8')\n", " \n", " return content\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "ccc653f6", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "3e78d693cd467e6f028d0ae62fd02724", "grade": false, "grade_id": "cell-3b2c81f08f9d2ced", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**A Simple Example.** To get a basic understanding of how sequential tasks are structured, we will construct a simple sequential toy problem. We will consider a problem with two binary input values $u_t$, $v_t$ and one binary output value $y_t$. All of these values are signals over time and can be evaluated at different discrete time steps $t \\in \\{0, 1, 2, 3, \\dots\\}$. In this task, the current value of the output signal $y_t$ does not only depend on the current values of the input signals, but also on the input behavior that was observed in the previous time steps!\n", "\n", "Specifically, the output function $y_t$ is defined to be $1$ only if the following two conditions are met:\n", "\n", "- Across all previous time steps $\\{ 0, \\dots, t-1 \\}$ a $1$ within the $u_t$ input signal was observed an *odd* number of times. (So only if the there have been $1, 3, 5, 7, ...$ ones in the input signal previously)\n", "- In the previous time step, the value of the $v_t$ input signal was $1$.\n", "\n", "The previously described sequential input-output problem is illustrated in the following example:" ] }, { "cell_type": "code", "execution_count": 6, "id": "e3b96927", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "7de2aaff4ce358d4f5d7b1a0c980d343", "grade": false, "grade_id": "cell-d7bcc1a1e0c11455", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "##### DO NOT CHANGE #####\n", "ts = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])\n", "us = np.array([1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1])\n", "vs = np.array([0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1])\n", "ys = np.array([0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1])\n", "\n", "# For an odd number of vs, it displays how many us there were in the past\n", "\n", "fig, (ax_us, ax_vs, ax_ys) = plt.subplots(\n", " ncols=1,\n", " nrows=3,\n", " figsize=(10, 15),\n", ")\n", "\n", "ax_us.step(ts-0.5, us, color='cyan')\n", "ax_us.set_title('Input Signal u')\n", "ax_us.set_xticks(ts)\n", "ax_us.set_xlim([-1, 16])\n", "ax_us.xaxis.grid(ls='--', alpha=0.5)\n", "\n", "ax_vs.step(ts-0.5, vs, color='magenta')\n", "ax_vs.set_title('Input Signal v')\n", "ax_vs.set_xticks(ts)\n", "ax_vs.set_xlim([-1, 16])\n", "ax_vs.xaxis.grid(ls='--', alpha=0.5)\n", "\n", "ax_ys.step(ts-0.5, ys, color='red')\n", "ax_ys.set_title('Output Signal y')\n", "ax_ys.set_xticks(ts)\n", "ax_ys.set_xlim([-1, 16])\n", "ax_ys.xaxis.grid(ls='--', alpha=0.5)\n", "ax_ys.set_xlabel('time steps')\n", "\n", "pass\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "f8e3a0c9", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "9d6ee6c63da577e32293234d5b227f68", "grade": false, "grade_id": "task-9-1", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 5.1 (2 points)** In this task, implement the function ``custom_step`` which receives the momentary values of the two input signals ``u`` and ``v`` for each timestep. Additionally, the function receives a dicitonary object ``data`` which is empty at the beginning, but retains it's content between every time step! You may use this dictionary object to implement some kind of \"memory\" to solve the recurrent task. In each time step, the ``custom_step`` function is supposed to return the predicted output value ``y`` according to the previously described rule." ] }, { "cell_type": "code", "execution_count": 7, "id": "58272687", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "a2bc8fa5bb2f19d4b43b8da19a50f0b8", "grade": false, "grade_id": "ans-9-1", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [ { "data": { "text/plain": [ "(array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]),\n", " array([1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1]),\n", " array([0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1]),\n", " array([0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1]))" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def custom_step(u: float, v: float, data: dict) -> float:\n", " \"\"\"\n", " This function receives the current value of the input signals ``u`` and ``v`` \n", " as well as the memory storage ``data`` and is supposed to return the predicted \n", " which solves the previously described sequential toy problem.\n", " \n", " :param u: The current value of input signal u\n", " :param v: The current value of input signal v\n", " :param data: A dictionary object that retains it's content between the time steps\n", " \n", " :returns: The predicted value for y\n", " \"\"\"\n", " #print(f\"{u=}, {v=}, {data=}\") \n", " if len(data.keys()) == 0:\n", " data[\"u_count\"] == 0\n", " \n", " oldu_count = data[\"u_count\"]\n", " oldv = data[\"v\"]\n", " if u == 1:\n", " data[\"u_count\"] += 1\n", " data[\"v\"] = v\n", "\n", " \n", " return int(oldu_count % 2 == 1 and oldv == 1)\n", "\n", "ts, us, vs, ys" ] }, { "cell_type": "code", "execution_count": 8, "id": "7e461483", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "53dcb8dd58b96971ef57bd129bb18a12", "grade": true, "grade_id": "test-9-1-custom-step", "locked": true, "points": 2, "schema_version": 3, "solution": false, "task": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "t: 00 - true: 0 - pred: 0\n", "t: 01 - true: 0 - pred: 0\n", "t: 02 - true: 1 - pred: 1\n", "t: 03 - true: 0 - pred: 0\n", "t: 04 - true: 0 - pred: 0\n", "t: 05 - true: 0 - pred: 0\n", "t: 06 - true: 0 - pred: 0\n", "t: 07 - true: 0 - pred: 0\n", "t: 08 - true: 1 - pred: 1\n", "t: 09 - true: 0 - pred: 0\n", "t: 10 - true: 0 - pred: 0\n", "t: 11 - true: 1 - pred: 1\n", "t: 12 - true: 0 - pred: 0\n", "t: 13 - true: 0 - pred: 0\n", "t: 14 - true: 0 - pred: 0\n", "t: 15 - true: 1 - pred: 1\n" ] } ], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-1-custom-step - possible points: 2\n", "\n", "data = defaultdict(int)\n", "\n", "for t, u, v, y in zip(ts, us, vs, ys):\n", " y_pred = custom_step(u, v, data)\n", " print(f't: {t:02d} - true: {y} - pred: {y_pred}')\n", " assert np.isclose(y, y_pred, atol=0.1), f'predicted {y_pred} != expected {y}'\n", " \n", "# NOTE: The hidden tests will perform the same kind of test with a number of randomly \n", "# generated input and corresponding output signals of different lenghts.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "b9f076f5", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "9e4a44faffb62624cd394308064ac815", "grade": false, "grade_id": "cell-a3e327c7b3a7d96d", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "## 9.2 Gated Recurrent Unit (GRU)\n", "\n", "The first kind of RNN layer that we want to look at is the *Gated Recurrent Unit (GRU)*. Compared to the default model found in the literature or presented in the lecture, we will consider a slightly extended version which also includes a *Readout Gate*. The following illustration shows a basic overview of this GRU architecture:\n", "\n", "\n", "\n", "The basic idea of this layer is that a hidden state vector $\\mathbf{h}$ is maintained and updated throughout each time step of the sequence. These hidden state values are supposed to act as some kind of \"memory\" which can be used to store information about previous time steps within the same sequence. The general workflow within a single forward pass of the GRU cell is to use the information that is contained within the current input vector $\\mathbf{x}_t$ and state vector $\\mathbf{h}_{t-1}$ of the previous time step to calculate (1) the readout value of the current time step and (2) the updated state vector $\\mathbf{h}_t$ for the next time step.\n", "\n", "The variation of the GRU cell that is used in this exercise can be decomposed into 4 general sections:\n", "\n", "- The *readout* gate determines the current readout value $\\mathbf{p}_t$ based on the input and previous state values.\n", "- The *reset* gate selectively determines which of the hidden states should be reset by multiplying each hidden state value with a $[0, 1]$ value based on input and previous state values.\n", "- The *selection* gate determines for each state value whether the previous state value should be maintained or the updated state values from the update gate should be used instead. This is done by determining the factor for a weighted average between the previous and updated state values: $(1 - r_z) \\cdot \\mathrm{previous\\;state} + (r_z) \\cdot \\mathrm{updated\\;state}$ \n", "- The *update* gate determines new state values based on the inputs and reset previous states. An important detail to consider here is that the state values themselves are ``tanh`` activated, which means that they can only ever be values in the range $[-1, 1]$." ] }, { "cell_type": "markdown", "id": "13d8ab09", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "1efb783747a2e34aaae4106bfa43a117", "grade": false, "grade_id": "task-9-2", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.2 (1 points)** To familiarize yourself with the architecture of the GRU cell, your next task is simply to determine the appropriate shape tuples for the weight matrices $W_r, W_z, W_h, W_p$ given the fixed shapes of the input signal ``x_shape``, the hidden vector ``h_shape`` and the readout vector ``p_shape``." ] }, { "cell_type": "code", "execution_count": 46, "id": "59c158ff", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "2e1900dad8fdd91eaf30e51e30365719", "grade": false, "grade_id": "ans-9-2-gru-shapes", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "\n", "x_shape = (5, )\n", "h_shape = (7, )\n", "p_shape = (2, )\n", "\n", "# TASK: Given the example shapes for the various input and output vectors, fill in the \n", "# expected shapes of the various weight matrices that are used in the GRU architecture.\n", "\n", "W_r_shape: tuple[int, int] = None\n", "W_z_shape: tuple[int, int] = None\n", "W_h_shape: tuple[int, int] = None\n", "W_p_shape: tuple[int, int] = None\n", "\n", "W_r_shape: tuple[int, int] = (x_shape[0]+h_shape[0], h_shape[0])[::-1]\n", "W_z_shape: tuple[int, int] = (x_shape[0]+h_shape[0], h_shape[0])[::-1]\n", "W_h_shape: tuple[int, int] = (x_shape[0]+h_shape[0], h_shape[0])[::-1]\n", "W_p_shape: tuple[int, int] = (x_shape[0]+h_shape[0], p_shape[0])[::-1]\n", "\n", "assert isinstance(W_p_shape, tuple)\n", "assert len(W_p_shape) == 2\n", "assert hashcheck(f'W_p{sum(W_p_shape)}') == '6c7416d9e1335f1b', 'W_p shape likely incorrect'" ] }, { "cell_type": "code", "execution_count": 47, "id": "133c4f28", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "eefe461dceabd49b598d73bdd05a4ff3", "grade": true, "grade_id": "test-9-2-gru-shapes", "locked": true, "points": 1, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-2-gru-shapes - possible points: 1\n", "\n", "assert isinstance(W_r_shape, tuple)\n", "assert len(W_r_shape) == 2\n", "assert hashcheck(f'W_r{sum(W_r_shape)}') == 'f1e3f6eba02f53b1', 'W_r shape likely incorrect'\n", "\n", "assert isinstance(W_z_shape, tuple)\n", "assert len(W_z_shape) == 2\n", "assert hashcheck(f'W_z{sum(W_z_shape)}') == 'e5f3c32506daf289', 'W_z shape likely incorrect'\n", "\n", "assert isinstance(W_h_shape, tuple)\n", "assert len(W_h_shape) == 2\n", "assert hashcheck(f'W_h{sum(W_h_shape)}') == '18645d6bfc1fc4b4', 'W_h shape likely incorrect'\n", "\n", "assert isinstance(W_p_shape, tuple)\n", "assert len(W_h_shape) == 2\n", "assert hashcheck(f'W_p{sum(W_p_shape)}') == '6c7416d9e1335f1b', 'W_p shape likely incorrect'\n", "\n", "# NOTE: The hidden tests will simply check for the exact expected shape.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "0fac7c86", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "5747eaf42e0896c0f9d6f6272d991954", "grade": false, "grade_id": "task-9-3", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.3 (6 points)** In this task, we want to solve the same toy problem as previously solved in Task 9.1. However, instead of using an arbitrary dictionary memory, we'll try to implement a ``GRU`` cell which implements the same functionality.\n", "\n", "The following code cell implements the ``gru_step`` function, which implements the above introduced GRU cell. In each time step, the function receices the current values of the input signals $u$ and $v$ as arguments. The function additionally receives the current *state* $h_0$, $h_1$ of the cell's memory. The function also accepts the various weight and bias tensors ($W_r$, $b_r$, $W_z$, ...) of the GRU cell as additional arguments. Based on these inputs, the function implements one forward pass of the GRU cell and is supposed to return a tuple of the following three values:\n", "- The updated value for the first state $h_0$\n", "- The updated value for the second state $h_1$\n", "- The current readout value $p$ which is supposed to match the value of the output signal $y$\n", " at the current time step.\n", " \n", "Your task is to populate the the weight and bias matrices $W_r$, $b_r$, $W_z$, $b_z$, $W_h$, $b_h$, $W_p$, $b_p$ in a way such that the readout value $p$ returned by the function matches the expected output signal $y$.\n", "\n", "**NOTE.** This is a difficult exercise - it might make sense to initially skip this exercise and return to it later on." ] }, { "cell_type": "code", "execution_count": 195, "id": "187bcedf", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "057713dedc617c45e5e357c93ba4bd84", "grade": false, "grade_id": "cell-92455f2c6f01dafa", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "def gru_step(u: int, \n", " v: int, \n", " h_0: float,\n", " h_1: float,\n", " # parameters for the reset\n", " W_r: np.ndarray,\n", " b_r: np.ndarray,\n", " # parameters for the gate\n", " W_z: np.ndarray,\n", " b_z: np.ndarray,\n", " # parameters for the update\n", " W_h: np.ndarray,\n", " b_h: np.ndarray,\n", " # parameters for the readout\n", " W_p: np.ndarray,\n", " b_p: np.ndarray,\n", " ) -> tuple[float, float, float]:\n", " \"\"\"\n", " This function implements the forward pass of a GRU cell.\n", " \n", " Based on the the current values of the input signals ``u`` and ``v``, the current \n", " values of the memory states ``h_0`` and ``h_1`` and the weight and bias matrices \n", " of the GRU cells to make the \n", " \n", " :param u: The current value of the u signal\n", " :param v: The current value fo the v signal\n", " :param h_0: The current value of the first memory cell\n", " :param h_1: The current value of the second memory cell\n", " :param W_r: the weight matrix of the reset gate\n", " :param b_r: the bias vector of the reset gate\n", " :param W_z: the weight matrix of the selection gate\n", " :param b_z: the bias vector of the selection gate\n", " :param W_h: the weight matrix of the update gate\n", " :param b_h: the bias vector of the update gate\n", " :param W_p: the weight matrix of the readout gate\n", " :param b_p: the bias vector of the readout gate\n", " \n", " :returns: A tuple of the following three values:\n", " - the updated value of h_0\n", " - the updated value of h_1\n", " - the readout value p\n", " \"\"\"\n", " \n", " h_t = np.array([h_0, h_1])\n", " inp = np.array([u, v, h_t[0], h_t[1]], dtype=float)\n", " \n", " # ~ reset vector\n", " r_t = np.sigmoid((W_r @ inp) + b_r)\n", " \n", " # ~ selection vector\n", " z_t = np.sigmoid((W_z @ inp) + b_z)\n", " \n", " # ~ new state vector\n", " h_t_ = h_t * r_t\n", " inp_tilde = np.array([u, v, h_t_[0], h_t_[1]])\n", " h_tilde = np.tanh((W_h @ inp_tilde) + b_h)\n", " \n", " # ~ updating the state\n", " h_t = (h_t * (1 - z_t)) + (h_tilde * z_t)\n", " \n", " # ~ readout value\n", " p = np.sigmoid((W_p @ np.array([u, v, h_0, h_1])) + b_p)\n", " \n", " return h_t[0], h_t[1], p\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": null, "id": "7ec6e703", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "6278a55a6a2297aeedc7c4212fb11098", "grade": false, "grade_id": "ans-9-3-gru-step", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# ~ \"reset\" weight matrix & bias\n", "W_r = np.array([\n", " [0, 0, 0, 0],\n", " [0, 0, 0, 0],\n", "], dtype=float)\n", "b_r = np.array([\n", " 0,\n", " 0,\n", "])\n", "\n", "# ~ \"gate\" weight matrix & bias\n", "W_z = np.array([\n", " [0, 0, 0, 0],\n", " [0, 0, 0, 0],\n", "], dtype=float)\n", "b_z = np.array([\n", " 0,\n", " 0\n", "])\n", "\n", "# ~ \"update\" weight matrix & bias\n", "W_h = np.array([\n", " [0, 0, 0, 0],\n", " [0, 0, 0, 0]\n", "], dtype=float)\n", "b_h = np.array([\n", " 0,\n", " 0,\n", "])\n", "\n", "# ~ \"readout\" weight matrix & bias\n", "W_p = np.array([\n", " [0, 0, 0, 0],\n", "], dtype=float)\n", "b_p = np.array([\n", " 0\n", "])\n", "\n", "# TASK: Fill in appropriate values for the given weight matrices and bias vectors \n", "# to realize the above described toy problem using the GRU cell.\n", "\n", "# HINT: With mathematical operations of matrix multiplication and addition, \n", "# you won't be able to match the expected integer output *exactly*. \n", "# However, the tests will check with an absolute tolerance of only 0.2, \n", "# which means you'll only have to match the output value within a \n", "# certain range. For an expected output value of 1, for example, a \n", "# predicted value of 0.83 would still be counted as correct!\n", "\n", "# HINT: Large input numbers will saturate the sigmoid and tanh functions. Think\n", "# about how to use this to your advantage to be able to work with (almost)\n", "# binary numbers instead of having to deal with continuous ones. The task\n", "# can entirely be solved by just storing \"binary\" flags in the two hidden \n", "# states of the memory.\n", "\n", "# HINT: Think about a way of debugging your solution. You might want to think about \n", "# writing some simple test cases yourself to understand how the function \n", "# is behaving. Also think about how you could better track the internal \n", "# workings of the layer. This could be as simple as printing the intermediate \n", "# vectors or as advanced as tracking them visually in some plots.\n", "\n", "# YOUR CODE HERE\n", "raise NotImplementedError()" ] }, { "cell_type": "code", "execution_count": null, "id": "ad8782f6", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "f020248f9285ae3909b966c26039b335", "grade": true, "grade_id": "test-9-3-gru-step", "locked": true, "points": 6, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-3-gru-step - possible points: 6\n", "\n", "# This is the example signal from the introduction of the sequential task\n", "us = np.array([1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1])\n", "vs = np.array([0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1])\n", "ys = np.array([0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1])\n", "\n", "h_0 = 0\n", "h_1 = 0\n", "for t, u, v, y in zip(ts, us, vs, ys):\n", " h_0, h_1, p = gru_step(\n", " u, v, h_0, h_1, \n", " W_r, b_r, \n", " W_z, b_z, \n", " W_h, b_h, \n", " W_p, b_p\n", " )\n", " \n", " y_pred = p[0]\n", " assert np.isclose(y, y_pred, atol=0.2), f'predicted {y_pred} != expected {y}'\n", " \n", " \n", "# NOTE: The hidden tests will evaluate the weight and bias arrays on a number of randomly \n", "# generated sequences of u, v input signals and corresponding output signals y.\n", " \n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "eca34c0d", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "4ebf83d12705738e7352b9a1bca7c817", "grade": false, "grade_id": "cell-5d99de620befe40c", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "## 9.3 Long-Short Term Memory (LSTM)\n", "\n", "The *Long-Short Term Memory (LSTM)* is an alternative to the GRU cell which was introduced by [Hochreiter and Schmidhuber](https://doi.org/10.1162/neco.1997.9.8.1735). The following image illustrates the basic architecture of an LSTM cell:\n", "\n", "\n", "\n", "Much like the GRU cell, the LSTM cell also internally maintains a hidden representation $\\mathbf{h}_t$ which acts as a kind of memory that can be used to keep track of an internal *state* across different time steps. However, unlike the GRU cell, the LSTM cell naturally includes a *readout gate* which separates the internal hidden state $\\mathbf{h}_t$ and the readout signal $\\mathbf{p}_t$. Additionally, the selection mechanism that determines which parts of the previous state $\\mathbf{h}_{t-1}$ to keep and which parts of the new state $\\mathbf{\\tilde{h}}_{t}$ to update is implemented differently.\n", "\n", "Specifically, the LSTM cell can be described by the following equations. The two selection signals $f_t, i_t$ determine how to compose the current time step's hidden state vector from the previous and updated version, where \n", "\n", "$$\n", "f_t = \\sigma(W_f \\cdot (h_{t-1} || x_t) + b_f)\n", "$$\n", "\n", "is the selection signal that determines which parts of the previous time step's hidden vector to keep and \n", "\n", "$$\n", "i_t = \\sigma(W_i \\cdot (h_{t-1} || x_t) + b_i)\n", "$$\n", "\n", "is the selection signal that determines which parts of the updated state vector to use. The updated state vector \n", "\n", "$$\n", "\\tilde{h}_t = \\mathrm{tanh}(W_h \\cdot (h_{t-1} || x_t) + b_h)\n", "$$\n", "\n", "is calculated by a linear layer transformation based on the input vector and the previous state vector.\n", "Finally, the new hidden state vector\n", "\n", "$$\n", "h_t = f_t \\cdot h_{t-1} + i_t \\cdot \\tilde{h}_t\n", "$$\n", "\n", "is a linear combination of the previous state vector and the updated state vector.\n", "The readout signal \n", "\n", "$$\n", "p_t = \\mathrm{tanh}(h_t) \\cdot \\sigma(W_o \\cdot (h_{t-1} || x_t) + b_o)\n", "$$\n", "\n", "is derived from a masking of the current time steps's hidden state vector." ] }, { "cell_type": "markdown", "id": "d0d8ed41", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "728d2c12c81b42dd25b1206cbc481cb7", "grade": false, "grade_id": "task-9-4", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.4 (3 points)** The goal of this task is to complete the implementation of the ``LSTM`` class below. To do this, you'll first have to properly initialize the weight matrices and bias vectors $W_f, b_f, W_i, b_i, W_h, b_h, W_o, b_o$ in the constructor of the class. In this task you can simply initialize all weight and bias arrays with all values set to zero. Additionally you'll have to implement the ``forward`` method which takes the current input vector $\\mathbf{x}_t$ and the previous hidden state vector $\\mathbf{h}_{t-1}$ as inputs and is expected to return a tuple consisting of the updated hidden state vector $\\mathbf{h}_t$ and the readout vector $\\mathbf{p}_t$." ] }, { "cell_type": "code", "execution_count": 74, "id": "c7f40b6c", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "f0d5bb7222bb46df6b9a66d905ba79c4", "grade": false, "grade_id": "ans-9-4", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "class LSTM:\n", " \"\"\"\n", " numpy implementation of the Long-Short Term Memory (LSTM) cell.\n", " \"\"\"\n", " \n", " def __init__(self,\n", " num_inputs: int,\n", " num_states: int,\n", " ):\n", " \n", " self.num_inputs = num_inputs\n", " self.num_states = num_states\n", " \n", " # selection gate\n", " self.W_f: np.ndarray = np.zeros((num_inputs+num_states,num_states)).T\n", " self.b_f: np.ndarray = np.zeros((num_states,))\n", " \n", " # selection gate\n", " self.W_i: np.ndarray = np.zeros((num_inputs+num_states,num_states)).T\n", " self.b_i: np.ndarray = np.zeros((num_states,))\n", " \n", " # update gate\n", " self.W_h: np.ndarray = np.zeros((num_inputs+num_states,num_states)).T\n", " self.b_h: np.ndarray = np.zeros((num_states,))\n", " \n", " # readout gate\n", " self.W_o: np.ndarray = np.zeros((num_inputs+num_states,num_states)).T\n", " self.b_o: np.ndarray = np.zeros((num_states,))\n", " \n", " def forward(self,\n", " x: np.ndarray,\n", " h: np.ndarray,\n", " ) -> tuple[np.ndarray, np.ndarray]:\n", " \"\"\"\n", " Given the current time step's input array ``x`` of the shape (input_dim, ) and the \n", " previous time step's hidden state array ``h`` of the shape (hidden_dim, ), this \n", " method returns a tuple (``h_new``, ``p``) consisting of the updated hidden state \n", " vector and the readout vector of the shape (hidden_dim, ).\n", " \"\"\"\n", "\n", " sigmoid = lambda x: 1/(1+np.exp(-x))\n", " \n", " conc = np.concatenate((x,h))\n", " ft = sigmoid(np.dot(self.W_f, conc) + self.b_f)\n", " it = sigmoid(np.dot(self.W_i, conc) + self.b_i)\n", " ht = np.tanh(np.dot(self.W_h, conc) + self.b_h)\n", " ot = sigmoid(np.dot(self.W_o, conc) + self.b_o)\n", "\n", " h_new = h*ft + it*ht\n", " p = np.tanh(h_new)*ot\n", " return h_new, p\n", " \n" ] }, { "cell_type": "code", "execution_count": 75, "id": "3b17eb4c", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "58cf095f4b37f1f89063f374c3c3163b", "grade": true, "grade_id": "test-9-4-lstm-constructor", "locked": true, "points": 1, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-4-lstm-constructor - possible points: 1\n", "\n", "# checking for one example instantiation of the LSTM class\n", "lstm = LSTM(2, 3)\n", "assert isinstance(lstm, LSTM)\n", "\n", "# all matrices and biase vectors have to be numpy arrays!\n", "assert isinstance(lstm.W_f, np.ndarray)\n", "assert isinstance(lstm.b_f, np.ndarray)\n", "assert isinstance(lstm.W_i, np.ndarray)\n", "assert isinstance(lstm.b_i, np.ndarray)\n", "assert isinstance(lstm.W_h, np.ndarray)\n", "assert isinstance(lstm.b_h, np.ndarray)\n", "assert isinstance(lstm.W_o, np.ndarray)\n", "assert isinstance(lstm.b_o, np.ndarray)\n", "\n", "# all \"W\" variables have to be matrices (2-dimensional)\n", "assert len(lstm.W_f.shape) == 2\n", "assert len(lstm.W_i.shape) == 2\n", "assert len(lstm.W_h.shape) == 2\n", "assert len(lstm.W_o.shape) == 2\n", "\n", "# all \"b\" variables have to be biases (1-dimensional)\n", "assert len(lstm.b_f.shape) == 1\n", "assert len(lstm.b_i.shape) == 1\n", "assert len(lstm.b_h.shape) == 1\n", "assert len(lstm.b_o.shape) == 1\n", "\n", "# checking the exact shape of one gate as an example\n", "assert hashcheck(f'W_f{lstm.W_f.shape}') == '9e7dcaf998a20515', 'W_f likely wrong shape'\n", "assert hashcheck(f'b_f{lstm.b_f.shape}') == 'd7ff6f69428e93c9', 'b_f likely wrong shape'\n", "\n", "# NOTE: The hidden tests will construct a number of test cases with combinations of \n", "# random input and hidden dimensions. For each combination, a new LSTM instance \n", "# will be created and the shapes of the weights and bias arrays will be checked \n", "# against the expected shapes.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 76, "id": "0022f219", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "9b7b462c2360511b90a0b253fe92cbfb", "grade": true, "grade_id": "test-9-4-lstm-forward", "locked": true, "points": 2, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-4-lstm-forward - possible points: 2\n", "\n", "lstm = LSTM(2, 3)\n", "x = np.array([1.0, 1.0])\n", "h = np.array([2.0, 3.0, -2.0])\n", "\n", "h_new, p = lstm.forward(x, h)\n", "\n", "assert isinstance(h_new, np.ndarray)\n", "assert h_new.shape == (3, )\n", "\n", "assert isinstance(p, np.ndarray)\n", "assert p.shape == (3, )\n", "\n", "# For the given input arrays these are the expected outputs of the forward method\n", "# HINT: For these results, the assumption is that all the matrices and biases are \n", "# initialized with all zero values in the constructor of the LSTM class\n", "assert np.isclose(h_new, np.array([1.0, 1.5, -1.0]), rtol=0.1).all(), 'h_new calculation incorrect'\n", "assert np.isclose(p, np.array([ 0.380, 0.452, -0.380]), rtol=0.1).all(), 'p calculation incorrect'\n", "\n", "# NOTE: The hidden tests will execute the forward pass for a few sample input and hidden \n", "# vectors and compare the results of the given implementation with the ground truth \n", "# implementation.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "b78950fd", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "839c5113535abed9d340b8b708bc2f89", "grade": false, "grade_id": "cell-dc60f9831168a4f0", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "## 9.4 ECG Classification\n", "\n", "**ECG Classification.** In this section, we want to apply the problem to an actual classification task on sequentially structured data. Specifically, we are going to use a dataset of [*Electrocardiogram (ECG)*](https://en.wikipedia.org/wiki/Electrocardiography) data. An ECG is a medical test that measures the electrical activity of the heart over a period of time using electrodes placed on the skin. This data is inherently sequential and is characterized by temporal patterns that are crucial for identifying heart conditions. The resulting measurement is a function over time and may, for example, look similar the following image:\n", "\n", "\n", "\n", "Such ECG measurements can be used to detect certain heart conditions which will create certain characteristic deviations in the course of the signal. To diagnose these heart conditions, specifically trained medical staff is able to identify the characteristic deviations in the signal. Alternatively, since the deviations caused by the various diseases follow a characteristic pattern, it is also possible to apply a machine learning model to use a patient's ECG signal as an input to classify a possible heart condition.\n", "\n", "**ECG Dataset.** In this section we'll be using a dataset of ECG data that consists of roughly 5k patient's records. Each record is a function over time representing only a single heartbeat within a larger EKG measurement. The records of different patients have different lengths." ] }, { "cell_type": "markdown", "id": "a8a67b33", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "a9bad1b5eee1a894d4a58e24c31f06e7", "grade": false, "grade_id": "cell-ba366cb8fd6212a6", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**Data Loading.** The first step in working with the given ECG dataset is to load the dataset into memory. However, compared to previous datasets you've worked with, sequential data is a less convenient data format. Since the different elements of a sequential dataset may have different sequence lenghts, they cannot be easily encoded into a single CSV file. Because of this, sequential datasets are sometimes given in more *unusual* data formats, where the individual sequences are often encoded as individual files. This circumstance can somewhat complicate the process of loading sequential datasets into a sensible format for the subsequent training of the machine learning models.\n", "\n", "The given ECG dataset is available as a ZIP archive ``ecg.zip``. This archive contains multiple files - exactly *one* for each element in the dataset. Each of these files is a ``.npy`` numpy file, which contains the binary encoding of a numpy array. The content of these files encodes a 1-dimensional numpy array that represents the ECG signal of one patient. The *names* of the files contain the information about the patient ID from which the sample was recoreded, as well as the target label (normal/abnormal) associated with each recording in the following format:\n", "\n", "``\"patient_{patient_id}_{label}.npy\"``\n", "\n", "Some examples are: \n", "\n", "- ``patient_0001_normal.npy``\n", "- ``patient_0022_normal.npy``\n", "- ``patient_2906_abnormal.npy``" ] }, { "cell_type": "markdown", "id": "a6c16a35", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "844d8bc2ad37f311f837804c2b64cb80", "grade": false, "grade_id": "task-9-5", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.5 (2 points)** In this task, the goal is to implement the ``load_dataset`` function. This function accepts the path to the dataset ZIP archive and is supposed to return a list of dictionaries, where ach dict represents a single element in the dataset. Each of these dicts should contain the following three key value pairs:\n", "\n", "- ``id``: The *integer* ID of the patient from whom the ECG recording was collected.\n", "- ``target``: The *integer* target value annotation, which is 0 for ECG data labeled as \"normal\" and is 1 for \"abnormal\" ECG data\n", "- ``signal``: A *list* of *lists* containing *float* values that represents the time series recording of the ECG itself.\n", "\n", "An example for the expected final result may look like this:\n", "\n", "```python\n", "dataset = [\n", " {\n", " 'id': 0,\n", " 'target': 0,\n", " # Wrapping each element in a list may seem redundant but we do this here because \n", " # the model later on expects each sequence to be a 2-dimensional tensor!\n", " 'signal': [[0.9], [0.8], ...],\n", " },\n", " {\n", " 'id': 1,\n", " 'target': 0,\n", " 'signal': [[0.7], [0.8], ...],\n", " },\n", " ...\n", "]\n", "```\n", "\n", "The path to the downloaded dataset archive is available in the ``zip_path`` variable." ] }, { "cell_type": "code", "execution_count": 77, "id": "246ac2a8", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "44b3a5786354f113ac43466034dda7d2", "grade": false, "grade_id": "cell-64a2b3d0701ef35b", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# The dataset archive ZIP file will be downloaded and stored on the local system at \n", "# the path given in \"zip_path\"\n", "content = nextcloud_download('https://bwsyncandshare.kit.edu/s/xMctqCzFp3XRtKM', raw=True)\n", "zip_path = 'ecg_.zip'\n", "with open(zip_path, 'wb') as file:\n", " file.write(content)\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 100, "id": "60121261", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "17c1866fb3a9cc6b62c5fca6d43604b1", "grade": false, "grade_id": "ans-9-5", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# TASK: Implement the following function which loads the dataset from its ZIP \n", "# archive format into a list of dictionaries that contain all the relevant \n", "# information about the various data points.\n", "\n", "# HINT: If not already the case, it makes sense to familiarize yourself with the \n", "# \".npy\" numpy file format and how to load the data contained in these kinds \n", "# of files.\n", "\n", "def load_dataset(path: str) -> list[dict]:\n", " \"\"\"\n", " This function loads the dataset from the ZIP archive at the given absolute string \n", " ``path`` and returns a list of dict objects, where each dict contains the information \n", " about one element of the dataset.\n", " \n", " Each dict contains the following 3 keys:\n", " - id: The unique integer id of the element\n", " - target: The integer target value annotation\n", " - signal: A list of list float values that represents the signal\n", " \"\"\"\n", " dataset = []\n", " with zipfile.ZipFile(path) as ecgzip:\n", " files = ecgzip.namelist()\n", " for file in files:\n", " with ecgzip.open(file, mode='r') as single:\n", " datum = {\n", " 'signal': np.load(single).reshape(-1,1).tolist(),\n", " 'id': int(file[8:12]),\n", " 'target': 1 if \"abnormal\"in file else 0\n", " }\n", " dataset.append(datum)\n", " return dataset\n", "\n", "ds = load_dataset(\"ecg_.zip\")" ] }, { "cell_type": "code", "execution_count": 103, "id": "fe7e6f5f", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "2c5aa9b9215450e0b570341d2ee09466", "grade": true, "grade_id": "test-9-5-load-dataset", "locked": true, "points": 2, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-5-load-dataset - possible points: 2\n", "\n", "# making sure the dataset ZIP archive exists\n", "assert os.path.exists(zip_path)\n", "assert os.path.isfile(zip_path)\n", "assert zip_path.endswith('.zip')\n", "\n", "# loading the dataset\n", "dataset = load_dataset(zip_path)\n", "assert len(dataset) == 4766\n", "\n", "# simply checking for the correctness of the first element as a proxy for \n", "# the entire dataset.\n", "first_element = dataset[0]\n", "assert 'id' in first_element\n", "assert isinstance(first_element['id'], int)\n", "\n", "assert 'target' in first_element\n", "assert isinstance(first_element['target'], int)\n", "\n", "assert 'signal' in first_element\n", "assert isinstance(first_element['signal'], list)\n", "assert len(first_element['signal']) > 0\n", "assert len(np.array(first_element['signal']).shape) == 2, \"The signal should be 2-dim arrays!\"\n", "\n", "# NOTE: The hidden tests will perform similar type and shape checks for all the \n", "# elements of the dataset.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 104, "id": "36981e4f", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "4cc8d1d8dddfbee29a8219e5b6bf6221", "grade": false, "grade_id": "cell-01833fecab335082", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [ { "data": { "text/plain": [ "Text(0, 0.5, 'signal value')" ] }, "execution_count": 104, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "##### DO NOT CHANGE #####\n", "target_data_map: dict[int, list] = defaultdict(list)\n", "for data in dataset:\n", " target_data_map[data['target']].append(data['signal'])\n", "\n", "# Based on this distinction of normal(0) and abnormal(1) we can now plot some \n", "# example signals for both labels to see if there is any distinguishable visual \n", "# difference between them.\n", "\n", "fig, (ax_normal, ax_abnormal) = plt.subplots(\n", " ncols=1,\n", " nrows=2,\n", " figsize=(10, 15)\n", ")\n", "\n", "num_samples = 20\n", "for i in range(num_samples):\n", " ax_normal.plot(target_data_map[0][i], color='blue', alpha=0.2)\n", " ax_abnormal.plot(target_data_map[1][i], color='red', alpha=0.2)\n", "\n", "ax_normal.set_title('Normal ECG Signals')\n", "ax_normal.set_xlabel('time')\n", "ax_normal.set_ylabel('signal value')\n", "\n", "ax_abnormal.set_title('Abnormal ECG Signals')\n", "ax_abnormal.set_xlabel('time')\n", "ax_abnormal.set_ylabel('signal value')\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "bd55b3d1", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "8fbb2c66983693ad5ddcc4d41c7debd9", "grade": false, "grade_id": "cell-e4d7a0799a45b49a", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "As we can see in these plots, the normal and abnormal ECG signals show quite a distinctive visual difference! While the normal signals are relatively constant in the middle section, while the abnormal signals show an local minimum and maximum in the middle.\n", "\n", "This obvious visual difference already indicates that the RNN model will likely also be able to achieve good prediction performance." ] }, { "cell_type": "markdown", "id": "8a7a4423", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "7a3e01d4109aa997eeb9c74f9bb531f3", "grade": false, "grade_id": "cell-d86c85952da88a32", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**PyTorch Sequential Data Handling.** One can imagine that working with sequences of different lengths also introduces challenges on a technical level. Deep Learning frameworks such as PyTorch handle data in the form of *tensors*. During the training of the model, each individual element can be considered and various elements can be combined into a *batch* tensor of higher-dimension to increase the computational efficiency of the training process, as it allows the model to train with multiple samples at the same time. This batching of multiple elements is relatively easy with static data types such as tabular data or image data. In the case of image data, for example, every image can be represented as a matrix of same dimensions and multiple images can easily be combined by stacking them in a third dimension. However, for sequential data - where elements can have different dimensions - the batching process isn't as easy. Trying to stack sequences of different lengths is like trying to construct a matrix with columns vectors of different lengths - which is simply not possible according to the definition of a matrix.\n", " \n", "One possible solution to the problem of sequence batching is to simply not use batching. If the model is trained by only every seeing a single input sequence at a time, the issue of different sequence lengths never arises. While this is certainly possible, it greatly decreases the computational efficiency of the training process and might have a negative impact on model performance as well.\n", "\n", "Contrary to just using single sequences, pytorch offers the option to use batching for sequential data by using *paddding*. In any given batch of sequences, all sequences are artificially extended to the length of the longest sequence by appending arbitrary elements to the end (usually zeros). Additionally, for every element in the batch we store the information about the length of the original sequence. The combination of this information - padded sequence tensor and sequence lengths - is then given to the model to perform the predictions and subsequent model updates.\n", "\n", "" ] }, { "cell_type": "markdown", "id": "8fd54143", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "294e4904b245b0eceb696fb03d91cc2c", "grade": false, "grade_id": "task-9-6", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.6 (2 points)** The goal of this task is to implement the ``batch_sequences`` function, which accepts a list ``seq_list`` of sequential dataset elements and returns a tuple ``(batch_tensor, lengths)``, where ``batch_tensor`` is a ``torch.Tensor`` instance which contains all of these sequences at the same time and which can be used later on to train the RNN models in a batched manner. ``lengths`` is supposed to be a list containing the integer length values of the input sequences.\n", "\n", "For this task we assume that there are ``num_sequences`` sequences given in the input list. Each element of that list is itself a list. These lists are the actual sequential dataset elements and may have different lengths, but we assume that there exists one list which is the longest with a lenght of ``max_length``. Each element of these lists is again a vector (list) with the number of ``num_dimensions``. The function should return a float pytorch tensor object with the shape:\n", "\n", "``(num_sequences, max_length, num_dimensions)``\n", "\n", "To do this, the function should perform *zero-padding* for the sequences of different lengths. \n", "\n", "The following example illustrates the expected behavior of the batching process:\n", "\n", "```python\n", "# input:\n", "seq_list = [\n", " [[1, 1], [2, 2], [3, 3]],\n", " [[4, 4]],\n", "]\n", "\n", "# output:\n", "batch_tensor = torch.Tensor([\n", " [[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],\n", " [[4.0, 4.0], [0.0, 0.0], [0.0, 0.0]],\n", "]) # shape: (2, 3, 2)\n", "lenghts = [\n", " 3,\n", " 1,\n", "]\n", "```" ] }, { "cell_type": "code", "execution_count": 135, "id": "0fa7592e", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "86f4b21c05bc7089aac031791ab3e300", "grade": false, "grade_id": "ans-9-6", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "import torch\n", "import torch.nn.utils.rnn as rnn\n", "\n", "# TASK: Implement the batch_sequences function which turns the raw list of sequences \n", "# into a single batched torch tensor.\n", "\n", "# HINT: torch may already provide a suitable utility function that solves the \n", "# majority of this task for you.\n", "\n", "# HINT: Make sure that the tensor has the data type torch.float32!\n", "\n", "def batch_sequences(seq_list: list[list]) -> tuple[torch.Tensor, list[int]]:\n", " \"\"\"\n", " Given a list of vector sequences ``seq_list`` this function performs zero padding \n", " and stacking of the sequences to return a single batched torch tensor with the \n", " shape (num_sequences, max_length, num_dimensions)\n", " \"\"\"\n", "\n", " num_dimensions = len(np.array(seq_list[0]).shape)\n", " ll = [len(t) for t in seq_list]\n", " max_length = max(ll)\n", " \n", " padded = [torch.nn.functional.pad(torch.Tensor(t), (0,0)*(num_dimensions-1)+(0, max_length-len(t))) for t in seq_list]\n", " return torch.stack(padded, dim=0), ll" ] }, { "cell_type": "code", "execution_count": 136, "id": "99e3067e", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "ceefaa5671bac5f067e5fdfb7a8e95b0", "grade": true, "grade_id": "test-9-6-batch-sequences", "locked": true, "points": 2, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-6-batch-sequences - possible points: 2\n", "\n", "seq_list_1 = [\n", " [[1.0], [2.0], [3.0]],\n", " [[1.0], [1.0]],\n", " [[3.0], [9.0], [8.0], [3.5]]\n", "]\n", "batch_1, lengths_1 = batch_sequences(seq_list_1)\n", "assert isinstance(batch_1, torch.Tensor)\n", "assert batch_1.shape == (3, 4, 1)\n", "assert tuple(lengths_1) == (3, 2, 4)\n", "\n", "seq_list_2 = [\n", " [[1.0, 1.0], [2.0, 2.0]],\n", " [[1.0, 0.0]],\n", " [[1.0, 1.0], [2.0, 1.0], [2.0, 0.0]],\n", " [[2.0, 2.0]],\n", "]\n", "batch_2, lengths_2 = batch_sequences(seq_list_2)\n", "assert isinstance(batch_2, torch.Tensor)\n", "assert batch_2.shape == (4, 3, 2)\n", "assert tuple(lengths_2) == (2, 1, 3, 1)\n", "\n", "# NOTE: The hidden tests will construct some randomly generated test cases, where \n", "# the different input sequences are randomly generated and the derived batch \n", "# tensors are checked for their shape.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "d1428d23", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "f4ff6dd8a6b44e6b96fedcbc045e32c0", "grade": false, "grade_id": "cell-cc478e50b1997ad0", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**PyTorch Recurrent Neural Networks.** In the previous sections, we've worked with custom implementations of the ``GRU`` and ``LSTM`` recurrent neural network cells. However, deep learning frameworks usually provide default implementations for well-known architectures like this. In Pytorch, these default implementations can easily be accessed via the ``torch.nn.GRU`` and ``torch.nn.LSTM`` classes.\n", "\n", "From a technical perspective, the only peculiarity when working with recurrent cells like these is how to properly handle the padded input batches. As previously discussed, the input to the model's forward pass is the padded batch of sequences ``x`` and the list of original sequence ``lengths``. However, the actual recurrent layers expect the data in the *\"packed\"* format. The pytorch utility function ``pack_padded_sequence`` can be used to convert the padded format into the packed format." ] }, { "cell_type": "markdown", "id": "d0ad4412", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "0016c2c9637b9a83c48115bbbed43bf3", "grade": false, "grade_id": "task-9-7", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.7 (5 points)** In this section, your task is to complete the implementations of the ``GruModel`` and ``LstmModel`` classes. Specifically, you'll have to implement the models constructor in the ``__init__`` method and the model forward pass in the ``forward`` method.\n", "\n", "Each model should perform a binary classification for *each sequence* in the given batch of input data. This means for each sequence in the input batch the model should return one float value in the range $[0, 1]$ where 0 represents the \"normal\" class and 1 represents the \"abnormal\" class, such that a [binary cross entropy loss](https://towardsdatascience.com/understanding-binary-cross-entropy-log-loss-a-visual-explanation-a3ac6025181a) can be used to train the model.\n", "\n", "To achieve this you'll have to extend the given model classes with additional transformation layers to achieve the desired output state such that the forward method outputs a tensor of the shape ``(batch_size, 1)`` where all values are in the range $[0, 1]$." ] }, { "cell_type": "code", "execution_count": 185, "id": "528707cf", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "9d13f04f6eaf53f08da7b86563b2f2bd", "grade": false, "grade_id": "ans-9-7", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "\n", "class GruModel(nn.Module):\n", " \"\"\"\n", " Implementation for a model that performs time-series classification based \n", " on the *gated recurrent unit (GRU)* cell architecture.\n", " \"\"\"\n", " \n", " def __init__(self,\n", " input_dim: int,\n", " hidden_dim: int,\n", " ):\n", " nn.Module.__init__(self)\n", " self.input_dim = input_dim\n", " self.hidden_dim = hidden_dim\n", " \n", " self.lay_gru = nn.GRU(input_dim, hidden_dim, batch_first=True)\n", " \n", " self.lay_output = nn.Linear(hidden_dim, 1)\n", " \n", " def forward(self, x: torch.Tensor, lengths: list[int]) -> torch.Tensor:\n", " \"\"\"\n", " Receives the padded input tensor ``x`` of shape (batch_size, max_length, input_dim) and the \n", " list ``lengths`` of the batch sequence lengths as input and returns a prediction output \n", " tensor of the shape (batch_size, 1) which consits of the sigmoid activated classification \n", " predictions for each sequence in the batch.\n", " \"\"\"\n", " packed_input = rnn.pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)\n", " packed_output, hidden = self.lay_gru(packed_input)\n", " \n", " res = torch.sigmoid(self.lay_output(hidden[0])).reshape((-1,1))\n", " \n", " return res\n", " \n", " \n", "class LstmModel(nn.Module):\n", " \"\"\"\n", " Implementation of a model that performs time-series classification based \n", " on the *long-short term memory* cell architecture.\n", " \"\"\"\n", " \n", " def __init__(self,\n", " input_dim: int,\n", " hidden_dim: int,\n", " ):\n", " nn.Module.__init__(self)\n", " self.input_dim = input_dim\n", " self.hidden_dim = hidden_dim\n", " \n", " self.lay_gru = nn.LSTM(input_dim, hidden_dim, batch_first=True)\n", " self.lay_output = nn.Linear(hidden_dim, 1)\n", "\n", " \n", " def forward(self, x: torch.Tensor, lengths: list[int]) -> torch.Tensor:\n", " \"\"\"\n", " Receives the padded input tensor ``x`` of shape (batch_size, max_length, input_dim) and the \n", " list ``lengths`` of the batch sequence lengths as input and returns a prediction output \n", " tensor of the shape (batch_size, 1) which consits of the sigmoid activated classification \n", " predictions for each sequence in the batch.\n", " \"\"\"\n", " packed_input = rnn.pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)\n", " packed_output, hidden = self.lay_gru(packed_input)\n", " \n", " res = torch.sigmoid(self.lay_output(hidden[0])).reshape((-1,1))\n", " \n", " return res" ] }, { "cell_type": "code", "execution_count": 186, "id": "0c7e1a55", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "c75e7c22e19e5b371083f643863f0032", "grade": true, "grade_id": "test-9-7-gru-model", "locked": true, "points": 1, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-7-gru-model - possible points: 1\n", "\n", "input_dim = 3\n", "hidden_dim = 64\n", "model_gru = GruModel(input_dim, hidden_dim)\n", "\n", "# At first we check if there even are any GRU modules used in the model\n", "assert any([isinstance(child, nn.GRU) for child in model_gru.children()]), (\n", " 'The GruModel class likely does not implement torch GRU layer'\n", ")\n", "# Then we make sure that the hidden size for those GRU modules has been \n", "# set correctely.\n", "for child in model_gru.children():\n", " if isinstance(child, nn.GRU):\n", " assert child.hidden_size == hidden_dim, 'GRU layer hidden size incorrect'\n", "\n", "# The general idea of the functional tests here is that we create random input \n", "# vectors, perform a model forward pass and see if the model output has the correct \n", "# shape, is sigmoid activated and whether or not it is possible to compute a \n", "# proper backward pass / gradient calculation for the given model implementation.\n", "for _ in range(3):\n", " \n", " batch_size = np.random.randint(2, 10)\n", " max_length = np.random.randint(10, 20)\n", " \n", " x = torch.tensor(\n", " np.random.rand(batch_size, max_length, input_dim),\n", " dtype=torch.float32,\n", " requires_grad=True,\n", " )\n", " lengths = [random.randint(2, max_length) for _ in range(batch_size)]\n", " \n", " # Forward pass\n", " out_pred = model_gru(x, lengths)\n", " assert isinstance(out_pred, torch.Tensor), 'model output is not a torch Tensor'\n", " assert out_pred.shape == (batch_size, 1), 'model output shape incorrect'\n", " \n", " # Output activation\n", " array_pred = out_pred.detach().numpy()\n", " for value in array_pred:\n", " assert 0 <= value <= 1, 'model output likely not sigmoid activated'\n", " \n", " # Backward pass\n", " torch.mean(out_pred).backward()\n", " assert x.grad is not None, 'model backward gradient calculation not working'\n", " \n", "# NOTE: The hidden tests perform the same kind of tests but for a larger number of \n", "# randomly generated input tensors.\n", " \n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 187, "id": "ab67225e", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "69452715eda2013116c76eafebbc31fe", "grade": true, "grade_id": "test-9-7-lstm-model", "locked": true, "points": 2, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-7-lstm-model - possible points: 2\n", "\n", "input_dim = 3\n", "hidden_dim = 64\n", "model_lstm = LstmModel(input_dim, hidden_dim)\n", "\n", "# At first we check if there even are any LSTM modules used in the model\n", "assert any([isinstance(child, nn.LSTM) for child in model_lstm.children()]), (\n", " 'The LstmModel class likely does not implement torch LSTM layer'\n", ")\n", "# Then we make sure that the hidden size for those LSTM modules has been \n", "# set correctely.\n", "for child in model_lstm.children():\n", " if isinstance(child, nn.LSTM):\n", " assert child.hidden_size == hidden_dim, 'LSTM layer hidden size incorrect'\n", "\n", "# The general idea of the functional tests here is that we create random input \n", "# vectors, perform a model forward pass and see if the model output has the correct \n", "# shape, is sigmoid activated and whether or not it is possible to compute a \n", "# proper backward pass / gradient calculation for the given model implementation.\n", "for _ in range(3):\n", " \n", " batch_size = np.random.randint(2, 10)\n", " max_length = np.random.randint(10, 20)\n", " \n", " x = torch.tensor(\n", " np.random.rand(batch_size, max_length, input_dim),\n", " dtype=torch.float32,\n", " requires_grad=True,\n", " )\n", " lengths = [random.randint(2, max_length) for _ in range(batch_size)]\n", " \n", " # Forward pass\n", " out_pred = model_lstm(x, lengths)\n", " assert isinstance(out_pred, torch.Tensor), 'model output is not a torch Tensor'\n", " assert out_pred.shape == (batch_size, 1), 'model output shape incorrect'\n", " \n", " # Output activation\n", " array_pred = out_pred.detach().numpy()\n", " for value in array_pred:\n", " assert 0 <= value <= 1, 'model output likely not sigmoid activated'\n", " \n", " # Backward pass\n", " torch.mean(out_pred).backward()\n", " assert x.grad is not None, 'model backward gradient calculation not working'\n", " \n", "# NOTE: The hidden tests perform the same kind of tests but for a larger number of \n", "# randomly generated input tensors.\n", " \n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 188, "id": "9568a5dc", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "9c07de66bb17f41fea0aee7a200164f7", "grade": false, "grade_id": "cell-3e2d2365bcbe693b", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "def train_model(model: nn.Module,\n", " dataset: list[dict],\n", " epochs: int = 5,\n", " batch_size: int = 64,\n", " learning_rate: float = 1e-3,\n", " ) -> list[float]:\n", " \"\"\"\n", " Trains the given ``model`` using the given ``dataset`` of sequential data. The model \n", " will be trained for the given number of ``epochs`` using the given ``batch_size`` \n", " and ``learning_rate``.\n", " \n", " The dataset should be a list containing dictionary objects where each dict represents \n", " one element of the dataset. Each dict should have the following keys:\n", " - signal: A list representation of the ECG signal\n", " - target: The integer target value annotation\n", " \n", " The function a list of float values which represents the history of losses over the \n", " various training epochs.\n", " \"\"\"\n", " indices = list(range(len(dataset)))\n", " \n", " criterion = nn.BCELoss()\n", " optimizer = optim.Adam(model.parameters(), lr=learning_rate)\n", " losses_epochs = []\n", " \n", " print('starting model training...')\n", " for epoch in range(epochs):\n", " \n", " indices_epoch = copy(indices)\n", " random.shuffle(indices_epoch)\n", " \n", " losses: int = []\n", " index_epoch = 0\n", " while index_epoch < len(indices) - batch_size:\n", " \n", " indices_batch = indices_epoch[index_epoch:index_epoch+batch_size]\n", " data_batch = [dataset[index] for index in indices_batch]\n", " sequences_batch = [data['signal'] for data in data_batch]\n", " target_batch = [data['target'] for data in data_batch]\n", " \n", " x_batch, lengths_batch = batch_sequences(sequences_batch)\n", " out_pred = model(x_batch, lengths_batch)\n", " out_true = torch.Tensor(target_batch).unsqueeze(-1)\n", " \n", " model.zero_grad()\n", " loss = criterion(out_pred, out_true)\n", " losses.append(loss.item())\n", " \n", " loss.backward()\n", " optimizer.step()\n", " \n", " index_epoch += batch_size\n", " \n", " print(f' * epoch ({epoch+1}/{epochs}) - loss: {np.mean(losses):.3f}')\n", " losses_epochs.append(np.mean(losses))\n", " \n", " return losses_epochs\n", "\n", "\n", "def evaluate_model(model: nn.Module,\n", " sequences: list[list],\n", " ) -> list[float]:\n", " \"\"\"\n", " evaluates the given ``model`` predictions on the given set of ``sequences``\n", " \n", " The function returns a list of the model predictions for each sequence in the \n", " given ``sequences`` list, which will be float values in the range [0, 1]\n", " \"\"\"\n", " out_pred = []\n", " for seq in sequences:\n", " x, lengths = batch_sequences([seq])\n", " out = model(x, lengths)\n", " out_pred.append(out.detach().numpy()[0])\n", " \n", " return out_pred\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 189, "id": "6bf393c7", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "7870afb56f389651bb9e4cf276122b26", "grade": false, "grade_id": "cell-983b78421a004758", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ~ Setting Up Dataset \n", "\n", "dataset = load_dataset(zip_path)\n", "random.shuffle(dataset)\n", "\n", "dataset_test = dataset[:500]\n", "dataset_train = dataset[500:]\n", "\n", "out_true = [data['target'] for data in dataset_test]\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 190, "id": "31b93688", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "9be0b0a41742cf318cba9cbcd8ef8c24", "grade": false, "grade_id": "cell-5ddf4de1ae1ddc31", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting model training...\n", " * epoch (1/5) - loss: 0.640\n", " * epoch (2/5) - loss: 0.355\n", " * epoch (3/5) - loss: 0.255\n", " * epoch (4/5) - loss: 0.210\n", " * epoch (5/5) - loss: 0.182\n", "GRU Model - test accuracy: 94.00%\n" ] } ], "source": [ "##### DO NOT CHANGE #####\n", "# ~ Training GRU Model\n", "\n", "model_gru = GruModel(1, 64)\n", "\n", "train_model(\n", " model=model_gru,\n", " dataset=dataset_train,\n", " epochs=5,\n", ")\n", "out_gru = evaluate_model(\n", " model=model_gru,\n", " sequences=[data['signal'] for data in dataset_test]\n", ")\n", "acc_gru = accuracy_score(out_true, np.round(out_gru))\n", "print(f'GRU Model - test accuracy: {acc_gru*100:.2f}%')\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 191, "id": "dcb1e5db", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "50372bcbabd783042bade3f94ee3844b", "grade": true, "grade_id": "test-9-7-gru-accuracy", "locked": true, "points": 1, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-7-gru-accuracy - possible points: 1\n", "\n", "assert len(dataset) == 4766\n", "assert model_gru is not None\n", "assert isinstance(model_gru, GruModel)\n", "\n", "assert acc_gru > 0.9\n", "\n", "# NOTE: The hidden tests will evaluate the model on an unseen test set and check \n", "# for a minimum performance, which the model will definitely achieve if it \n", "# fullfulls the accuracy requirement above as well.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 192, "id": "373246d8", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "e4d9ba07e886190740b5bd5a42d0454d", "grade": false, "grade_id": "cell-e4f754138a662aa4", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting model training...\n", " * epoch (1/5) - loss: 0.611\n", " * epoch (2/5) - loss: 0.404\n", " * epoch (3/5) - loss: 0.314\n", " * epoch (4/5) - loss: 0.232\n", " * epoch (5/5) - loss: 0.195\n", "LSTM Model - test accuracy: 96.80%\n" ] } ], "source": [ "##### DO NOT CHANGE #####\n", "# ~ Training LSTM Model\n", "\n", "model_lstm = LstmModel(1, 64)\n", "train_model(\n", " model=model_lstm,\n", " dataset=dataset_train,\n", " epochs=5,\n", ")\n", "out_lstm = evaluate_model(\n", " model=model_lstm,\n", " sequences=[data['signal'] for data in dataset_test],\n", ")\n", "acc_lstm = accuracy_score(out_true, np.round(out_lstm))\n", "print(f'LSTM Model - test accuracy: {acc_lstm*100:.2f}%')\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": 194, "id": "50b9d3c0", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "b5455797f2baec4fded8b3bf4b12c8be", "grade": true, "grade_id": "test-9-7-lstm-accuracy", "locked": true, "points": 1, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-7-lstm-accuracy - possible points: 1\n", "\n", "assert len(dataset) == 4766\n", "assert model_lstm is not None\n", "assert isinstance(model_lstm, LstmModel)\n", "\n", "assert acc_lstm > 0.9\n", "\n", "# NOTE: The hidden tests will evaluate the model on an unseen test set and check \n", "# for a minimum performance, which the model will definitely achieve if it \n", "# fullfulls the accuracy requirement above as well.\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "b32505db", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "affd895e1f087d5be7b42ec141bfb186", "grade": false, "grade_id": "cell-b234e581cb5a5c1c", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "## 9.5 Sentiment Analysis\n", "\n", "**Free-Form Task.** In this final section, you'll have the chance to freely apply everything you've learned so far. The only goal of this section is to create a model that solves the sentiment classification problem described below. While it is recommended to apply your knowledge of recurrent neural networks to solve this task, in principle you are free to use any method you want, so long as it solves the given problem.\n", "\n", "**Sentiment Classification.** In this section, we will apply the concept of recurrent neural networks to the task of sentiment classification. For a given piece of text, the goal is to classify the underlying sentiment as either *negative* or *positive*. To illustrate this, conisder the following simple examples: \n", "\n", "- I really liked this cake. (Positive)\n", "- I've had a terrible day! (Negative)\n", "\n", "Based on these, one might think that such a task mainly comes down to detecting the presence of certain negative and positive indicators (good, bad, ...). While this may be true for the majority of samples in practice, one also has to consider edge cases such as negations and sarcasm:\n", "\n", "- This restaurant really isn't bad! (Positive)\n", "- That meeting was sooo necessary... I am very glad it happend (Negative)\n", "\n", "**Tweet Sentiment Dataset.** The specific dataset we'll be using consists of 10k short [tweets](https://en.wikipedia.org/wiki/Twitter) in which users give their opinion concerning a variety of different video games. Each tweet is annotated with either the *Positive* or the *Negative* label, indicating the general sentiment of its content. The following are some examples from the dataset:\n", "\n", "- RhandlerR RhandlerR RhandlerR RhandlerR . This doesn’t look right. At all. Why am I just noticing this now? Shouldn’t the payouts be better in M4? Or are these typos. (Negative)\n", "- Finally made it into the base 10, just to try to hold on to it. Feels like enough really good meta right now in pirate pirates with all of the odd dh to even mage in high legend... (Positive)\n" ] }, { "cell_type": "markdown", "id": "3ec5e9bf", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "941783f90188016b343107d64f50fd08", "grade": false, "grade_id": "task-9-8", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "**🛠️ Task 9.8 (10 points)** In this task, the overall goal is to solve the previously described sentiment classification problem for the Twitter dataset. While it is recommended to do so using an RNN model, your are open to use any method you want, so long as you as your model class implements the ``SentimentInterface`` that is introduced below. To solve the exercise simply assign a new instance of your custom model implementation to the ``model`` variable in the cells below.\n", "\n", "The points for this exercise will be distributed incrementally based on the performance of your model.\n", "\n", "- ``+1 points``: If your model class confirms to the given interface and is able to produce proper predictions \n", " (only considers general functionality, not yet the correctness of the predictions!)\n", "- ``+3 points``: If your model achieves a performance which is better than random guessing (>55% accuracy)\n", "- ``+3 points``: If your model achieves *decent* performance (>70% accuracy)\n", "- ``+3 points``: If your model achieves *good* performance (>85% accuracy)\n", "\n", "The performance will be evaluated on an unseen test dataset. This test dataset consists of 100 elements and is balanaced regarding the output labels (same number of negative and positive samples).\n", "\n", "**HINT.** Having your final model be trained in-time during grading will likely cause unstable results. Even if the training showed good results on your local machine, there is always the possibility of an unfavorable initialization leading to worse performance. Additionally, during grading there will be a cell execution timeout of *1 minute*, which might not be enough time for a model to achieve sufficient performance. Consequently, you might want to consider to extensively train your model locally and then only load the pre-trained model weights in your graded submission. During grading, you'll be able to access [publically shared](https://docs.nextcloud.com/server/latest/user_manual/en/files/sharing.html) files from BwSync&Share, by using the ``nextcloud_download`` function. You can use this, for example, to download a file containing pre-trained model weights. You can find more information about how to export and import such model weights [online](https://pytorch.org/tutorials/beginner/saving_loading_models.html):\n", "\n", "**NOTE.** During grading, internet access to everything except BwSync&Share will be blocked. This means it won't be possible to simply redirect the task to a publicly available service such as GPT or HuggingFace!" ] }, { "cell_type": "code", "execution_count": null, "id": "130115e8", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "a898c3604492eec90d5057c8087a3c6b", "grade": false, "grade_id": "cell-1b6799c21078ef5a", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "\n", "class SentimentInterface:\n", " \"\"\"\n", " This interface has to be implemented for a sentiment classification model which \n", " solves the given task. To implement this interface for a custom model class, simply \n", " inherit from it and overwrite the ``predict_sentiment`` method with your custom \n", " solution.\n", " \"\"\"\n", " \n", " def predict_sentiment(text: str) -> typ.Literal['Negative', 'Positive']:\n", " \"\"\"\n", " This function receives a single string ``text`` and is supposed to implement \n", " some sort of sentiment classification method to finally return either of the \n", " string labels \"Negative\" or \"Positive\" to determine the given text's overall \n", " sentiment.\n", " \n", " :param: String literal\n", " \n", " :returns: String literal which is either \"Negative\" in case of a negative \n", " sentiment and \"Positive\" in case of a positive sentiment.\n", " \"\"\"\n", " raise NotImplemented()\n", " \n", " \n", "# ~ Mock Implementation of a naive sentiment classification approach\n", "\n", "class MockModel(SentimentInterface):\n", " \n", " def predict_sentiment(self, text: str) -> str:\n", " if 'good' in text:\n", " return 'Positive'\n", " else:\n", " return 'Negative'\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": null, "id": "86721d7b", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "b5d02fb903405f4624ae7933b049ea68", "grade": false, "grade_id": "cell-e75de694c578d425", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# The dataset is downloaded from the file share server and stored as a Dataframe\n", "content = nextcloud_download('https://bwsyncandshare.kit.edu/s/nAy6oMPdP7cKAz7')\n", "df: pd.DataFrame = pd.read_csv(io.StringIO(content))\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": null, "id": "19c092da", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "d40f5050882a62877490d80d2cbcb73e", "grade": false, "grade_id": "ans-9-8-explore", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# == DATA EXPLORATION ==\n", "# You may use this cell to perform some exploratory data analysis in order to get \n", "# a better overview of what type of dataset you'll be working with.\n", "\n", "# YOUR CODE HERE\n", "raise NotImplementedError()" ] }, { "cell_type": "code", "execution_count": null, "id": "f0e788a6", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "d3b64c36b024a430fa16f9c8d1916b47", "grade": false, "grade_id": "ans-9-8-model", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# == MODEL IMPLEMENTATION ==\n", "# You may use this cell to define your model implementation.\n", "# In case you are planning to implement a RNN solution, here are several pointers of \n", "# what aspects to consider:\n", "# - The first challange is how to convert the text itself into a sequence which a \n", "# neural network model can understand. You'll have to find some way to meaningfully \n", "# convert the string into a series of numeric vectors.\n", "# For this task, several solutions of varying complexity exist. A simple solution might \n", "# be a one-hot encoding for characters and more complex solutions can use third-party \n", "# \"tokenizers\".\n", "\n", "# YOUR CODE HERE\n", "raise NotImplementedError()" ] }, { "cell_type": "code", "execution_count": null, "id": "850697ae", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "42f28b315c3d14fefd321967b723a43e", "grade": false, "grade_id": "ans-9-8-training", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# === MODEL TRAINING ===\n", "# You may use this cell to train your previously model.\n", "# For your training, keep in mind the cell execution timout during grading! If a \n", "# cell runs longer than 5 minutes it will automatically be terminated, so make \n", "# sure to set up your training accordingly!\n", "\n", "# YOUR CODE HERE\n", "raise NotImplementedError()" ] }, { "cell_type": "code", "execution_count": null, "id": "67b8b3d7", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "1679a5053e3dbeb8fb710cecd1c9f7c5", "grade": false, "grade_id": "ans-9-8-evaluation", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# === MODEL EVALUATION ===\n", "# You may this cell to evaluate the performance of your model. \n", "# To do this, you might want to think about splitting the dataset that is available to you \n", "# into a appropriate train and validation sets. Your model's performance on an unseen \n", "# validation split should give you a better indication of the final performance on the \n", "# the unseen test set.\n", "\n", "# YOUR CODE HERE\n", "raise NotImplementedError()" ] }, { "cell_type": "code", "execution_count": null, "id": "7b2f5b29", "metadata": { "deletable": false, "nbgrader": { "cell_type": "code", "checksum": "162430540d5902589a2e4d997f23ca0a", "grade": false, "grade_id": "ans-9-8-loading", "locked": false, "schema_version": 3, "solution": true, "task": false } }, "outputs": [], "source": [ "# TASK: Create a new instance of your custom model implementation which inherits \n", "# from the \"TextClassifierInterface\" and assign this instance to the model \n", "# variable below.\n", "\n", "# HINT: During the grading of the notebook, general access to the internet is blocked.\n", "# This means that it won't be possible to simply send the text snippets to \n", "# a ChatGPT and or HuggingFace API to solve the task for you!\n", "\n", "# HINT: However, access to the university's BwSync&Share cloud is enabled. This means \n", "# that you'll be able to circumvent the cell execution timeout for model training \n", "# by loading pre-trained model weights from your cloud storage.\n", "\n", "model: SentimentInterface = None\n", "\n", "# YOUR CODE HERE\n", "raise NotImplementedError()" ] }, { "cell_type": "code", "execution_count": null, "id": "9cf21f2b", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "82e018f7dc5a78b332209b423400c747", "grade": true, "grade_id": "test-9-8-functionality", "locked": true, "points": 1, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-8-functionality - possible points: 1\n", "\n", "assert model is not None\n", "assert isinstance(model, SentimentInterface), 'make sure the model implements the interface!'\n", "\n", "reviews_ = [\n", " 'This is the worst game I have ever played in my life!',\n", " 'The GREATEST experience of all time - I am so happy',\n", "]\n", "for review in reviews_:\n", " prediction = model.predict_sentiment(review)\n", " print(f'input: \"{review}\" - prediction: {prediction}')\n", " assert isinstance(prediction, str)\n", " assert prediction in ['Negative', 'Positive']\n", "\n", "# NOTE: There are no additional hidden test cases in this cell\n", " \n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": null, "id": "e1c30951", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "dc403af1ede47a8979543800a6e15e36", "grade": true, "grade_id": "test-9-8-basic-acc", "locked": true, "points": 3, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-8-basic-acc - possible points: 3\n", "\n", "# NOTE: The hidden tests will evaluate the model on the unseen test set and check \n", "# if the accuracy is >= 0.55\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": null, "id": "da79bae9", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "67ff03c748a0899560f8f55b6163127a", "grade": true, "grade_id": "test-9-8-decent-acc", "locked": true, "points": 3, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-8-decent-acc - possible points: 3\n", "\n", "# NOTE: The hidden tests will evaluate the model on the unseen test set and check \n", "# if the accuracy is >= 0.70\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "code", "execution_count": null, "id": "ddd32fb0", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "code", "checksum": "cbb7bac737513ee5bccaea81d0af6598", "grade": true, "grade_id": "test-9-8-good-acc", "locked": true, "points": 3, "schema_version": 3, "solution": false, "task": false } }, "outputs": [], "source": [ "##### DO NOT CHANGE #####\n", "# ID: test-9-8-good-acc - possible points: 3\n", "\n", "# NOTE: The hidden tests will evaluate the model on the unseen test set and check \n", "# if the accuracy is >= 0.85\n", "\n", "\n", "##### DO NOT CHANGE #####" ] }, { "cell_type": "markdown", "id": "f6871c1d", "metadata": { "deletable": false, "editable": false, "nbgrader": { "cell_type": "markdown", "checksum": "63fb029dd19d102de57b5f452bb71ab8", "grade": false, "grade_id": "parting-message", "locked": true, "schema_version": 3, "solution": false, "task": false } }, "source": [ "👋 I hope you've found this exercise sheet helpful for your understanding on recurrent neural networks! The final free-form task was a bit of an experiment that deviated slightly from the usually more guided style of the exercises. It would helpful if you could fill out the [Exercise Evaluation](https://ilias.studium.kit.edu/ilias.php?baseClass=ilObjSurveyGUI&ref_id=2359493&cmd=infoScreen) and let us know your opinion about more open-ended tasks like this!" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.2" } }, "nbformat": 4, "nbformat_minor": 5 }