Kubernetes API - Example of client-go Informers

Explore client-go Informers

SharedInformers provide hooks to receive notifications of adds, updates, and deletes for a particular resource. This saves us connections against the API server, duplicate serialization costs server-side, duplicate deserialization costs controller-side, and duplicate caching costs controller-side.

This program aims to dimistify the workings of a controller with this example that outputs the name of the pod when it’s added to the cluster.

Create a new directory and cd into it
cd $HOME
mkdir client-go-informers-example
cd client-go-informers-example
Create a package
go mod init client-go-informers-example
Create main.go that access the cluster and watches as pods being added to the namespace
cat <<EOF > main.go
package main

import (
   "fmt"
   "log"
   corev1 "k8s.io/api/core/v1"
   "k8s.io/apimachinery/pkg/util/runtime"

   "k8s.io/client-go/informers"
   "k8s.io/client-go/kubernetes"
   "k8s.io/client-go/tools/cache"
   "k8s.io/client-go/tools/clientcmd"
   "k8s.io/apimachinery/pkg/labels"
)

func main() {
   log.Print("Shared Informer app started")
   config, err := clientcmd.BuildConfigFromFlags("", "<path-to-kubeconfig>")
   if err != nil {
      log.Panic(err.Error())
   }
   clientset, err := kubernetes.NewForConfig(config)
   if err != nil {
      log.Panic(err.Error())
   }

   // Create a channel to stops the shared informer gracefully
   stopper := make(chan struct{})
   defer close(stopper)
    
   // Create the shared informer factory and use the client to connect to Kubernetes
   factory := informers.NewSharedInformerFactory(clientset, 0)
   
   // Get the informer for the right resource, in this case a Pod
   podInformer := factory.Core().V1().Pods()
   informer := podInformer.Informer()

   // Kubernetes serves an utility to handle API crashes
   defer runtime.HandleCrash()

   // start informer ->
   go factory.Start(stopper)

   // start to sync and call list
   if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
      runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
      return
   }
    
   // This is the part where your custom code gets triggered based on the
   // event that the shared informer catches
   informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: onAdd,  // register add eventhandler
      UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") },
      DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
   })

   // find pods in myns ns
   lister := podInformer.Lister().Pods("myns")

   pods, err := lister.List(labels.Everything())

   if err != nil {
      fmt.Println(err)
   }

   fmt.Println("pods:", pods)

   <-stopper
}

// onAdd is the function executed when the kubernetes informer notified the
// presence of a new pod in the cluster
func onAdd(obj interface{}) {
   // Cast the obj as [pod]
   mypod := obj.(*corev1.Pod)
   if mypod.Namespace == "myns" {
      fmt.Printf("A new pod is added to the namespace %s with the name %s\n",mypod.Namespace, mypod.Name)
   }  
}
EOF
Edit main.go with replacing <path-to-kubeconfig> with /home/ec2-user/.kube/config
config, err := clientcmd.BuildConfigFromFlags("", "/home/ec2-user/.kube/config")
Before you can run the program, you need to download external modules
go mod tidy
Run the program
go run main.go

Observe the output. How many pod is listed in the output ?

In a new window, create a new project myns
oc new-project myns
Create a new pod in the myns namespace
kubectl run nginx1 --image=nginx --restart=Never --namespace myns
Switch to the terminal where the application is running.

Do you see the output of a new pod is added to the namespace?

Create another pod and observe the log again
kubectl run nginx2 --image=nginx --restart=Never --namespace myns