{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Adding New and Custom Extensions\n",
    "\n",
    "This tutorial will cover using the `PropertiesExtension` and `ExtensionManagementMixin` classes in `pystac.extensions.base` to implement a new extension to PySTAC.\n",
    "\n",
    "For this exercise, we will implement an imaginary Order Request Extension that allows us to track an internal order ID associated with a given satellite image, as well as the history of that imagery order. This use-case is specific enough that it would probably not be a good candidate for an actual STAC Extension, but it gives us an opportunity to highlight some of the key aspects and patterns used in implementing STAC Extensions in PySTAC."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we import the PySTAC modules and classes that we will be using throughout the tutorial."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from datetime import datetime, timedelta\n",
    "from pprint import pprint\n",
    "from typing import Any, Dict, List, Optional, Union\n",
    "from uuid import uuid4\n",
    "\n",
    "import pystac\n",
    "from pystac.utils import (\n",
    "    StringEnum,\n",
    "    datetime_to_str,\n",
    "    get_required,\n",
    "    map_opt,\n",
    "    str_to_datetime,\n",
    ")\n",
    "from pystac.extensions.base import PropertiesExtension, ExtensionManagementMixin"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define the Extension"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Our extension will extend STAC Items by adding the following properties:\n",
    "\n",
    "- `order:id`: A unique string ID associated with the internal order for this image. This field will be required.\n",
    "- `order:history`: A chronological list of events associated with this order. Each of these \"events\" will have a timestamp and an event type, which will be one of the following: `submitted`, `started_processing`, `delivered`, `cancelled`. This field will be optional."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Extension Classes\n",
    "\n",
    "Let's start by creating a class to represent the order history events."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "class OrderEventType(StringEnum):\n",
    "    SUBMITTED = \"submitted\"\n",
    "    STARTED_PROCESSING = \"started_processing\"\n",
    "    DELIVERED = \"delivered\"\n",
    "    CANCELLED = \"cancelled\"\n",
    "\n",
    "\n",
    "class OrderEvent:\n",
    "    properties: Dict[str, Any]\n",
    "\n",
    "    def __init__(self, properties: Dict[str, Any]) -> None:\n",
    "        self.properties = properties\n",
    "\n",
    "    @property\n",
    "    def event_type(self) -> OrderEventType:\n",
    "        return get_required(self.properties.get(\"type\"), self, \"event_type\")\n",
    "\n",
    "    @event_type.setter\n",
    "    def event_type(self, v: OrderEventType) -> None:\n",
    "        self.properties[\"type\"] = str(v)\n",
    "\n",
    "    @property\n",
    "    def timestamp(self) -> datetime:\n",
    "        return str_to_datetime(\n",
    "            get_required(self.properties.get(\"timestamp\"), self, \"timestamp\")\n",
    "        )\n",
    "\n",
    "    @timestamp.setter\n",
    "    def timestamp(self, v: datetime) -> None:\n",
    "        self.properties[\"timestamp\"] = datetime_to_str(v)\n",
    "\n",
    "    def __repr__(self) -> str:\n",
    "        return \"<OrderEvent \" f\"type={self.event_type} \" f\"timestamp={self.timestamp}>\"\n",
    "\n",
    "    def apply(\n",
    "        self,\n",
    "        event_type: OrderEventType,\n",
    "        timestamp: datetime,\n",
    "    ) -> None:\n",
    "        self.event_type = event_type\n",
    "        self.timestamp = timestamp\n",
    "\n",
    "    @classmethod\n",
    "    def create(\n",
    "        cls,\n",
    "        event_type: OrderEventType,\n",
    "        timestamp: datetime,\n",
    "    ) -> \"OrderEvent\":\n",
    "        oe = cls({})\n",
    "        oe.apply(event_type=event_type, timestamp=timestamp)\n",
    "        return oe\n",
    "\n",
    "    def to_dict(self) -> Dict[str, Any]:\n",
    "        return self.properties"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A few important notes about how we constructed this:\n",
    "\n",
    "- We used PySTAC's [StringEnum class](https://pystac.readthedocs.io/en/latest/api/utils.html#pystac.utils.StringEnum), which inherits from the Python [Enum](https://docs.python.org/3/library/enum.html) class, to capture the allowed event type values. This class has built-in methods that will convert these instances to strings when serializing STAC Items to JSON.\n",
    "- We use property getters and setters to manipulate a `properties` dictionary in our `OrderEvent` class. We will see later how this pattern allows us to mutate Item property dictionaries in-place so that updates to the `OrderEvent` object are synced to the Item they extend.\n",
    "- The `timestamp` property is converted to a string before it is saved in the `properties` dictionary. This ensures that dictionary is always JSON-serializable but allows us to work with the values as a Python `datetime` instance when using the property getter.\n",
    "- We use `event_type` as our property name so that we do not shadow the built-in `type` function in the `apply` method. However, this values is stored under the desired `\"type\"` key in the underlying `properties` dictionary."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we will create a new class inheriting from `PropertiesExtension` and `ExtensionManagementMixin`. Since this class only extends `pystac.Item` instance, we do not need to make it [generic](https://docs.python.org/3/library/typing.html#typing.Generic). If you were creating an extension that applied to multiple object types (e.g. `pystac.Item` and `pystac.Asset`) then you would need to inherit from `typing.Generic` as well and create concrete extension classed for each of these object types (see the [EOExtension](https://github.com/stac-utils/pystac/blob/3c5176f178a4345cb50d5dab83f1dab504ed2682/pystac/extensions/eo.py#L279), [ItemEOExtension](https://github.com/stac-utils/pystac/blob/3c5176f178a4345cb50d5dab83f1dab504ed2682/pystac/extensions/eo.py#L385), and [AssetEOExtension](https://github.com/stac-utils/pystac/blob/3c5176f178a4345cb50d5dab83f1dab504ed2682/pystac/extensions/eo.py#L429) classes for an example of this implementation)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "SCHEMA_URI: str = \"https://example.com/image-order/v1.0.0/schema.json\"\n",
    "PREFIX: str = \"order:\"\n",
    "ID_PROP: str = PREFIX + \"id\"\n",
    "HISTORY_PROP: str = PREFIX + \"history\"\n",
    "\n",
    "\n",
    "class OrderExtension(\n",
    "    PropertiesExtension, ExtensionManagementMixin[Union[pystac.Item, pystac.Collection]]\n",
    "):\n",
    "    def __init__(self, item: pystac.Item):\n",
    "        self.item = item\n",
    "        self.properties = item.properties\n",
    "\n",
    "    def apply(\n",
    "        self, order_id: str = None, history: Optional[List[OrderEvent]] = None\n",
    "    ) -> None:\n",
    "        self.order_id = order_id\n",
    "        self.history = history\n",
    "\n",
    "    @property\n",
    "    def order_id(self) -> str:\n",
    "        return get_required(self._get_property(ID_PROP, str), self, ID_PROP)\n",
    "\n",
    "    @order_id.setter\n",
    "    def order_id(self, v: str) -> None:\n",
    "        self._set_property(ID_PROP, v, pop_if_none=False)\n",
    "\n",
    "    @property\n",
    "    def history(self) -> Optional[List[OrderEvent]]:\n",
    "        return map_opt(\n",
    "            lambda history: [OrderEvent(d) for d in history],\n",
    "            self._get_property(HISTORY_PROP, List[OrderEvent]),\n",
    "        )\n",
    "\n",
    "    @history.setter\n",
    "    def history(self, v: Optional[List[OrderEvent]]) -> None:\n",
    "        self._set_property(\n",
    "            HISTORY_PROP,\n",
    "            map_opt(lambda history: [event.to_dict() for event in history], v),\n",
    "            pop_if_none=True,\n",
    "        )\n",
    "\n",
    "    @classmethod\n",
    "    def get_schema_uri(cls) -> str:\n",
    "        return SCHEMA_URI\n",
    "\n",
    "    @classmethod\n",
    "    def ext(cls, obj: pystac.Item, add_if_missing: bool = False) -> \"OrderExtension\":\n",
    "        if isinstance(obj, pystac.Item):\n",
    "            cls.validate_has_extension(obj, add_if_missing)\n",
    "            return OrderExtension(obj)\n",
    "        else:\n",
    "            raise pystac.ExtensionTypeError(\n",
    "                f\"OrderExtension does not apply to type '{type(obj).__name__}'\"\n",
    "            )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As with the `OrderEvent` class, we use property getters and setters for our extension fields (the `PropertiesExtension` class has a `properties` attribute where these are stored). Rather than setting these values directly in the dictionary, we use the `_get_property` and `_set_property` methods that are built into the `PropertiesExtension` class). We also add an `ext` method that will be used to extend `pystac.Item` instances, and a `get_schema_uri` method that is required for all `PropertiesExtension` classes."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use the Extension\n",
    "\n",
    "Let's try using our new classes to extend an `Item` and access the extension properties. We'll start by loading the core Item example from the STAC spec examples [here](https://github.com/radiantearth/stac-spec/blob/master/examples/core-item.json) and printing the existing properties."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'title': 'Core Item',\n",
       " 'description': 'A sample STAC Item that includes examples of all common metadata',\n",
       " 'datetime': None,\n",
       " 'start_datetime': '2020-12-11T22:38:32.125Z',\n",
       " 'end_datetime': '2020-12-11T22:38:32.327Z',\n",
       " 'created': '2020-12-12T01:48:13.725Z',\n",
       " 'updated': '2020-12-12T01:48:13.725Z',\n",
       " 'platform': 'cool_sat1',\n",
       " 'instruments': ['cool_sensor_v1'],\n",
       " 'constellation': 'ion',\n",
       " 'mission': 'collection 5624',\n",
       " 'gsd': 0.512}"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "item = pystac.read_file(\n",
    "    \"https://raw.githubusercontent.com/radiantearth/stac-spec/master/examples/core-item.json\"\n",
    ")\n",
    "item.properties"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, let's verify that this Item does not implement our new Order Extension yet and that it does not already contain any of our Order Extension properties."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Implements Extension: False\n",
      "Order ID: None\n",
      "History:\n"
     ]
    }
   ],
   "source": [
    "print(f\"Implements Extension: {OrderExtension.has_extension(item)}\")\n",
    "print(f\"Order ID: {item.properties.get(ID_PROP)}\")\n",
    "print(\"History:\")\n",
    "for event in item.properties.get(HISTORY_PROP, []):\n",
    "    pprint(event)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As expected, this Item does not implement the extension (i.e. the schema URI is not in the Item's `stac_extensions` list). Let's add it, create an instance of `OrderExtension` that extends the `Item`, and add some values for our extension fields."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "order_ext = OrderExtension.ext(item, add_if_missing=True)\n",
    "\n",
    "# Create a unique string ID for the order ID\n",
    "order_ext.order_id = str(uuid4())\n",
    "\n",
    "# Create some fake order history and set it using the extension\n",
    "event_1 = OrderEvent.create(\n",
    "    event_type=OrderEventType.SUBMITTED, timestamp=datetime.now() - timedelta(days=1)\n",
    ")\n",
    "event_2 = OrderEvent.create(\n",
    "    event_type=OrderEventType.STARTED_PROCESSING,\n",
    "    timestamp=datetime.now() - timedelta(hours=12),\n",
    ")\n",
    "event_3 = OrderEvent.create(\n",
    "    event_type=OrderEventType.DELIVERED, timestamp=datetime.now() - timedelta(hours=1)\n",
    ")\n",
    "order_ext.history = [event_1, event_2, event_3]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's check to see if these values were written to our Item properties."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Implements Extension: True\n",
      "Order ID: f6f367b6-a787-48de-941f-08a04be77683\n",
      "History:\n",
      "{'timestamp': '2022-01-20T11:44:01.803820Z', 'type': 'submitted'}\n",
      "{'timestamp': '2022-01-20T23:44:01.803939Z', 'type': 'started_processing'}\n",
      "{'timestamp': '2022-01-21T10:44:01.803999Z', 'type': 'delivered'}\n"
     ]
    }
   ],
   "source": [
    "print(f\"Implements Extension: {OrderExtension.has_extension(item)}\")\n",
    "print(f\"Order ID: {item.properties.get(ID_PROP)}\")\n",
    "print(\"History:\")\n",
    "for event in item.properties.get(HISTORY_PROP, []):\n",
    "    pprint(event)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
