Hyperparameter Optimization with Ray Tune¶
In this guide we build on top of the Ray usage example with a Ray Tune example at the bottom. The following code block is from node_class example, see this example for more details.
Cora Dataset¶
>>> import tempfile
>>> from deepgnn.graph_engine.data.citation import Cora
>>> data_dir = tempfile.TemporaryDirectory()
>>> Cora(data_dir.name)
<deepgnn.graph_engine.data.citation.Cora object at 0x...>
GAT Model¶
Setup¶
>>> from typing import List, Tuple, Any, Dict
>>> from dataclasses import dataclass, field
>>> import os
>>> import numpy as np
>>> import torch
>>> import torch.nn as nn
>>> import torch.nn.functional as F
>>> import ray
>>> import ray.train as train
>>> from ray.train.torch import TorchTrainer
>>> from ray.air import session
>>> from ray.air.config import ScalingConfig, RunConfig
>>> from ray import tune
>>> import deepgnn.pytorch
>>> from deepgnn.pytorch.nn.gat_conv import GATConv
>>> from deepgnn.graph_engine import Graph, graph_ops
>>> from deepgnn.pytorch.modeling import BaseModel
>>> from deepgnn.graph_engine.snark.distributed import Server, Client as DistributedClient
>>> from deepgnn.graph_engine.data.citation import Cora
Query¶
>>> @dataclass
... class GATQuery:
... feature_meta: list = field(default_factory=lambda: np.array([[0, 1433]]))
... label_meta: list = field(default_factory=lambda: np.array([[1, 1]]))
... feature_type: np.dtype = np.float32
... label_type: np.dtype = np.float32
... neighbor_edge_types: list = field(default_factory=lambda: [0])
... num_hops: int = 2
...
... def query(self, g: DistributedClient, idx: int) -> Dict[Any, np.ndarray]:
... """Query used to generate data for training."""
... if isinstance(idx, (int, float)):
... idx = [idx]
... inputs = np.array(idx, np.int64)
... nodes, edges, src_idx = graph_ops.sub_graph(
... g,
... inputs,
... edge_types=np.array(self.neighbor_edge_types, np.int64),
... num_hops=self.num_hops,
... self_loop=True,
... undirected=True,
... return_edges=True,
... )
... input_mask = np.zeros(nodes.size, np.bool)
... input_mask[src_idx] = True
...
... feat = g.node_features(nodes, self.feature_meta, self.feature_type)
... label = g.node_features(nodes, self.label_meta, self.label_type).astype(np.int64)
... return {"nodes": np.expand_dims(nodes, 0), "feat": np.expand_dims(feat, 0), "labels": np.expand_dims(label, 0), "input_mask": np.expand_dims(input_mask, 0), "edges": np.expand_dims(edges, 0)}
Model Forward and Init¶
>>> class GAT(nn.Module):
... def __init__(
... self,
... in_dim: int,
... head_num: List = [8, 1],
... hidden_dim: int = 8,
... num_classes: int = -1,
... ffd_drop: float = 0.0,
... attn_drop: float = 0.0,
... ):
... super().__init__()
... self.num_classes = num_classes
... self.out_dim = num_classes
...
... self.input_layer = GATConv(
... in_dim=in_dim,
... attn_heads=head_num[0],
... out_dim=hidden_dim,
... act=F.elu,
... in_drop=ffd_drop,
... coef_drop=attn_drop,
... attn_aggregate="concat",
... )
... layer0_output_dim = head_num[0] * hidden_dim
... assert len(head_num) == 2
... self.out_layer = GATConv(
... in_dim=layer0_output_dim,
... attn_heads=head_num[1],
... out_dim=self.out_dim,
... act=None,
... in_drop=ffd_drop,
... coef_drop=attn_drop,
... attn_aggregate="average",
... )
...
... def forward(self, context: Dict[Any, np.ndarray]):
... nodes = torch.squeeze(context["nodes"]) # [N], N: num of nodes in subgraph
... feat = torch.squeeze(context["feat"]) # [N, F]
... mask = torch.squeeze(context["input_mask"]) # [N]
... labels = torch.squeeze(context["labels"]) # [N]
... edges = torch.squeeze(context["edges"].reshape((-1, 2))) # [X, 2], X: num of edges in subgraph
...
... edges = np.transpose(edges)
...
... sp_adj = torch.sparse_coo_tensor(edges, torch.ones(edges.shape[1], dtype=torch.float32), (nodes.shape[0], nodes.shape[0]))
... h_1 = self.input_layer(feat, sp_adj)
... scores = self.out_layer(h_1, sp_adj)
...
... scores = scores[mask] # [batch_size]
... return scores
Ray Tune¶
First we define a standard torch training loop using the ray dataset.
>>> def train_func(config: Dict):
... # Set random seed
... train.torch.enable_reproducibility(seed=session.get_world_rank())
...
... # Start server
... address = "localhost:9999"
... s = Server(address, config["data_dir"], 0, 1)
... g = DistributedClient(address)
...
... # Initialize the model and wrap it with Ray
... model = GAT(in_dim=1433, num_classes=7)
... model = train.torch.prepare_model(model)
...
... # Initialize the optimizer and wrap it with Ray
... optimizer = torch.optim.Adam(model.parameters(), lr=config["learning_rate"], weight_decay=0.0005)
... optimizer = train.torch.prepare_optimizer(optimizer)
...
... # Define the loss function
... loss_fn = nn.CrossEntropyLoss()
...
... # Ray Dataset
... dataset = ray.data.range(2708).repartition(2708 // config["batch_size"]) # -> Dataset(num_blocks=6, num_rows=2708, schema=<class 'int'>)
... pipe = dataset.window(blocks_per_window=10).repeat(config["n_epochs"]) # -> DatasetPipeline(num_windows=1, num_stages=1)
... q = GATQuery()
... def transform_batch(batch: list) -> dict:
... return q.query(g, batch) # When we reference the server g in transform, it uses Client instead
... pipe = pipe.map_batches(transform_batch)
...
... # Execute the training loop
... model.train()
... for epoch, epoch_pipe in enumerate(pipe.iter_epochs()):
... epoch_pipe = epoch_pipe.random_shuffle_each_window()
... for i, batch in enumerate(epoch_pipe.iter_torch_batches(batch_size=config["batch_size"])):
... scores = model(batch)
... labels = batch["labels"][batch["input_mask"]].flatten()
... loss = loss_fn(scores.type(torch.float32), labels)
... optimizer.zero_grad()
... loss.backward()
... optimizer.step()
...
... session.report({"metric": (scores.argmax(1) == labels).float().mean().item()})
Now we define the objective function using this trainer. The objective function will take a set of parameters from the tuner and return a fitness value.
>>> ray.init(num_cpus=8)
RayContext(...)
>>> def objective(learning_rate, n_epochs):
... trainer = TorchTrainer(
... train_func,
... train_loop_config={
... "batch_size": 2708,
... "data_dir": data_dir.name,
... "sample_filename": "train.nodes",
... "n_epochs": n_epochs,
... "learning_rate": learning_rate,
... },
... run_config=RunConfig(verbose=0),
... scaling_config=ScalingConfig(num_workers=1, use_gpu=False),
... )
... result = trainer.fit()
... return result
This training function wraps this objective function for use in the tuner.
>>> def training_function(config):
... results = objective(config["learning_rate"], config["n_epochs"])
... tune.report(accuracy=results.metrics["metric"])
Finally we define and make use of the tuner. We use the hyperparameters learning_rate and n_epochs, with one training iteration per configuration.
See the `Ray Tune guides, here<https://docs.ray.io/en/latest/tune/tutorials/overview.html>`_.
>>> tuner = tune.Tuner(
... training_function,
... param_space={
... "learning_rate": tune.grid_search([.05, .005, .0005]),
... "n_epochs": tune.choice([2, 4])
... },
... tune_config=tune.TuneConfig(num_samples=1),
... run_config=RunConfig(
... stop={"training_iteration": 1},
... verbose=0,
... ),
... )
>>> analysis = tuner.fit()
>>> analysis.get_best_result(metric="accuracy", mode="max")
Result(metrics={'accuracy': ..., 'experiment_tag': '..._learning_rate=...,n_epochs=...'}, ...)
>>> analysis.get_dataframe()
accuracy ...
0 0.3...
1 0.3...
[3 rows x 21 columns]