Flannel 启动运行分析
注意: 基于 flannel-v0.12.0 版本
flannel 可以为容器提供网络服务。 其模型为全部的容器使用一个 network,然后在每个 host 上从 network 中划分一个子网 subnet。 为 host 上的容器创建网络时,从 subnet 中划分一个ip给容器。
其采用目前比较流行的 no server 的方式,即不存在所谓的控制节点,而是每个 host 上的 flanneld 从一个 etcd 中获取相关数据,然后声明自己的子网网段,并记录在 etcd 中。
其他的 host 对数据转发时,从 etcd 中查询到该子网所在的 host 的 ip,然后将数据发往对应 host 上的 flanneld,交由其进行转发。
根据 kubernetes 的模型,即为每个 pod 提供一个 ip。 flannel 的模型正好与之契合。
三大核心概念:
network
负责网络的管理(以后的方向是多网络模型,一个主机上同时存在多种网络模式),根据每个网络的配置调用 subnet;subnet
负责和 etcd 交互,把 etcd 中的信息转换为 flannel 的子网数据结构,并对 etcd 进行子网和网络的监听;backend
接受 subnet 的监听事件,负责增删相应的路由规则
源文件分析
服务启动的核心代码在 main.go
声明配置
package main
// 导入程序依赖的库文件
// 这些库文件在 flannel 目录下,主要有如下几个目录:
// backend
// network
// pkg
// subnet
// vendor
// version
import ""
// 定一个字符串类型的 Slice 切片: flagSlice
// Slice 切片:可变长度的数组
// flagSlice 是给 XXXXXXXXXXX 使用的?还不理解
type flagSlice []string
func (t *flagSlice) String() string {...}
func (t *flagSlice) Set(val string) error {...}
// 定一个结构体:CmdLineOpts
// 结构体:自定义一个数据类型,该类型内容有用户自定义,可以同时包含数值、bool、字符串等基础数据类型
// CmdLineOpts: 是定义 flanneld 系统进程在启动时的命令行参数
type CmdLineOpts struct {...}
// 定义一些变量
// opts: 使用自定义结构体来初始化一个变量,用来报错 flanneld 进程启动时的命令行参数配置
// errInterrupted: XXXXXXX 不清楚用途
// errCanceled:XXXXXXX 不清楚用途
// flannelFlags: flannelFlags
var (
opts CmdLineOpts
errInterrupted = errors.New("interrupted")
errCanceled = errors.New("canceled")
flannelFlags = flag.NewFlagSet("flannel", flag.ExitOnError)
)
// 其他 func 函数及 main 函数
func FUN(){...}
func main(){
......
}
特定功能函数
在 main.go
文件中有一下函数提供功能
func init() {
func copyFlag(name string) {
func usage() {
func newSubnetManager() (subnet.Manager, error) {
func main() {
func recycleIPTables(nw ip.IP4Net, lease *subnet.Lease) error {
func shutdownHandler(ctx context.Context, sigs chan os.Signal, cancel context.CancelFunc) {
func getConfig(ctx context.Context, sm subnet.Manager) (*subnet.Config, error) {
func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg *sync.WaitGroup) error {
func LookupExtIface(ifname string, ifregex string) (*backend.ExternalInterface, error) {
func WriteSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
func mustRunHealthz() {
func ReadCIDRFromSubnetFile(path string, CIDRKey string) ip.IP4Net {
main() 函数介绍
// 行数:184 ~ 187
// 判断 flanneld 运行时是否指定了 `-version` 参数
// 如果指定了,就输出一下版本信息并退出
if opts.version {...}
// 行数:189
// XXXXXXX 不清楚用途,
// 查了容器内没有以 FLANNELD 开头的环境变量,只有一个 FLANNEL_ARCH=amd64
flagutil.SetFlagsFromEnv(flannelFlags, "FLANNELD")
// 行数:191 ~ 195
// 判断 flanneld 运行时指定的 `-subnet-lease-renew-margin` 参数是否合法
// 该参数的功能是节点获取 subnet 的租期长度,单位是分钟, 默认是 60 分钟
// 最大是 1天(24*60=1440),最小是1分钟,所以有效的配置范围是 1 <= LEASE <= 1439 的整数
if opts.subnetLeaseRenewMargin >= 24*60 || opts.subnetLeaseRenewMargin <= 0 {
// 行数:197 ~ 239
// 获取节点的网卡和 IP 地址信息
var extIface *backend.ExternalInterface
var err error
if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 {
// 行数: 241 ~ 246
// 初始化子网配置
// 一个主机是一个子网,负责与 etcd 交互
sm, err := newSubnetManager()
// 行数:248 ~ 251
// 注册 SIGINT and SIGTERM信号监听,安装一个监听内核信号量的handler
log.Info("Installing signal handlers")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
// 行数:258 ~ 265
//
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
shutdownHandler(ctx, sigs, cancel)
wg.Done()
}()
// 行数:267 ~ 270
// 根据 flanneld 程序运行时指定的端口配置 `-healthz-port` 判断是否合法,要求端口大于 0
// 当指定了端口之后,要求 flanneld 进程运行必须 health,否则会退出进程???
if opts.healthzPort > 0 {
// 行数:272 ~ 277
// 获取配置文件内容保存到 config 变量中,给后续获取网段、子网、backend类型使用
// 如果 启动参数中 `-subnet-file` 被指定,那么就到该指定的文件中获取配置
// 如果没有指定,默认文件为 /run/flannel/subnet.env
config, err := getConfig(ctx, sm)
// 行数:279 ~ 295
// 创建 backend 控制器,例如使用 VxLAN 方式
bm := backend.NewManager(ctx, sm, extIface)
// 行数:297 ~ 307
// 根据 flanneld 进程参数判断是否需要配置 ipMasq
if opts.ipMasq {
// 行数:309 ~ 315
// 修改 iptables 默认规则,因为 Docker 在 1.13 版本以后,iptables 的 FORWARD 默认规则为 DROP
if opts.iptablesForwardRules {
// 行数:317 ~ 322
// 将获取的子网信息,即分配给节点的网段信息保存到宿主机文件系统上
if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
// 行数:324 ~ 330
// 运行一个 backend 网络后台服务,比如 VxLAN 网络
// 使用 goroutine 的协程方式后台运行,会一直在后端运行
bn.Run(ctx)
// 行数: 332
// XXXXXXX 不清楚用途,
daemon.SdNotify(false, "READY=1")
// 行数:334 ~ 341
// Kube子网mgr不会为该节点租用子网-它仅使用已分配的podCidr
if !opts.kubeSubnetMgr {
// 行数: 343 ~ 347
// 等等后端 goroutines 运行停止后退出
wg.Wait()
运行和日志跟踪源码
flannel 服务的启动的进程信息如下:
[root@wenyu-test mygotest]# docker top a2a3de8fd1bb
UID PID PPID C STIME TTY TIME CMD
root 26339 26319 0 Apr26 ? 00:01:14 /opt/bin/flanneld --ip-masq --kube-subnet-mgr
基于 flannel 在 Kubernetes 集群启动运行日志进行分析,日志内容如下:
[root@wenyu-test ~]# kubectl -n kube-system logs kube-flannel-ds-amd64-pprxl
I0426 07:03:41.276084 1 main.go:518] Determining IP address of default interface
I0426 07:03:41.276338 1 main.go:531] Using interface with name eth0 and address 172.20.0.19
I0426 07:03:41.276354 1 main.go:548] Defaulting external address to interface address (172.20.0.19)
W0426 07:03:41.276362 1 client_config.go:517] Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.
I0426 07:03:41.284547 1 kube.go:119] Waiting 10m0s for node controller to sync
I0426 07:03:41.284633 1 kube.go:306] Starting kube subnet manager
I0426 07:03:42.284763 1 kube.go:126] Node controller sync successful
I0426 07:03:42.284796 1 main.go:246] Created subnet manager: Kubernetes Subnet Manager - wenyu-test
I0426 07:03:42.284800 1 main.go:249] Installing signal handlers
I0426 07:03:42.284924 1 main.go:390] Found network config - Backend type: vxlan
I0426 07:03:42.284971 1 vxlan.go:121] VXLAN config: VNI=1 Port=0 GBP=false Learning=false DirectRouting=false
I0426 07:03:42.373504 1 main.go:355] Current network or subnet (10.244.0.0/16, 10.244.0.0/24) is not equal to previous one (0.0.0.0/0, 0.0.0.0/0), trying to recycle old iptables rules
I0426 07:03:42.375094 1 iptables.go:167] Deleting iptables rule: -s 0.0.0.0/0 -d 0.0.0.0/0 -j RETURN
I0426 07:03:42.380009 1 iptables.go:167] Deleting iptables rule: -s 0.0.0.0/0 ! -d 224.0.0.0/4 -j MASQUERADE --random-fully
I0426 07:03:42.380747 1 iptables.go:167] Deleting iptables rule: ! -s 0.0.0.0/0 -d 0.0.0.0/0 -j RETURN
I0426 07:03:42.381462 1 iptables.go:167] Deleting iptables rule: ! -s 0.0.0.0/0 -d 0.0.0.0/0 -j MASQUERADE --random-fully
I0426 07:03:42.382118 1 main.go:305] Setting up masking rules
I0426 07:03:42.473738 1 main.go:313] Changing default FORWARD chain policy to ACCEPT
I0426 07:03:42.473822 1 main.go:321] Wrote subnet file to /run/flannel/subnet.env
I0426 07:03:42.473828 1 main.go:325] Running backend.
I0426 07:03:42.473834 1 main.go:343] Waiting for all goroutines to exit
I0426 07:03:42.473884 1 vxlan_network.go:60] watching for new subnet leases
I0426 07:03:42.678139 1 iptables.go:145] Some iptables rules are missing; deleting and recreating rules
I0426 07:03:42.678165 1 iptables.go:167] Deleting iptables rule: -s 10.244.0.0/16 -j ACCEPT
I0426 07:03:42.678232 1 iptables.go:145] Some iptables rules are missing; deleting and recreating rules
I0426 07:03:42.678242 1 iptables.go:167] Deleting iptables rule: -s 10.244.0.0/16 -d 10.244.0.0/16 -j RETURN
I0426 07:03:42.678814 1 iptables.go:167] Deleting iptables rule: -d 10.244.0.0/16 -j ACCEPT
I0426 07:03:42.679422 1 iptables.go:167] Deleting iptables rule: -s 10.244.0.0/16 ! -d 224.0.0.0/4 -j MASQUERADE --random-fully
I0426 07:03:42.679423 1 iptables.go:155] Adding iptables rule: -s 10.244.0.0/16 -j ACCEPT
I0426 07:03:42.680072 1 iptables.go:167] Deleting iptables rule: ! -s 10.244.0.0/16 -d 10.244.0.0/24 -j RETURN
I0426 07:03:42.680663 1 iptables.go:167] Deleting iptables rule: ! -s 10.244.0.0/16 -d 10.244.0.0/16 -j MASQUERADE --random-fully
I0426 07:03:42.773408 1 iptables.go:155] Adding iptables rule: -s 10.244.0.0/16 -d 10.244.0.0/16 -j RETURN
I0426 07:03:42.874301 1 iptables.go:155] Adding iptables rule: -d 10.244.0.0/16 -j ACCEPT
I0426 07:03:42.974331 1 iptables.go:155] Adding iptables rule: -s 10.244.0.0/16 ! -d 224.0.0.0/4 -j MASQUERADE --random-fully
I0426 07:03:42.975906 1 iptables.go:155] Adding iptables rule: ! -s 10.244.0.0/16 -d 10.244.0.0/24 -j RETURN
I0426 07:03:42.977501 1 iptables.go:155] Adding iptables rule: ! -s 10.244.0.0/16 -d 10.244.0.0/16 -j MASQUERADE --random-fully
代码运行流程顺序
步骤1: 初始化及参数解析
- 版本信息
- 参数合法判断
步骤2:获取节点的网卡和 IP 地址信息
// 行数:197 ~ 239 // 获取节点的网卡和 IP 地址信息 var extIface *backend.ExternalInterface var err error if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 {
步骤3: 初始子网
// 行数: 241 ~ 246 // 初始化子网配置 // 一个主机是一个子网,负责与 etcd 交互 sm, err := newSubnetManager()
步骤4: ctx, cancel := context.WithCancel(context.Background())
步骤5:读取配置
// 行数:272 ~ 277
// 获取配置文件内容保存到 config 变量中,给后续获取网段、子网、backend类型使用
// 如果 启动参数中 -subnet-file
被指定,那么就到该指定的文件中获取配置
// 如果没有指定,默认文件为 /run/flannel/subnet.env
config, err := getConfig(ctx, sm)
步骤6:修改iptables规则
// 行数:309 ~ 315 // 修改 iptables 默认规则,因为 Docker 在 1.13 版本以后,iptables 的 FORWARD 默认规则为 DROP if opts.iptablesForwardRules {
步骤7:到 etcd 中获取子网信息
// 行数:317 ~ 322 // 将获取的子网信息,即分配给节点的网段信息保存到宿主机文件系统上 if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
步骤8:使用一个 backend 运行服务
// 行数:324 ~ 330 // 运行一个 backend 网络后台服务,比如 VxLAN 网络 // 使用 goroutine 的协程方式后台运行,会一直在后端运行 bn.Run(ctx)
源代码
Github 文件 https://raw.githubusercontent.com/flannel-io/flannel/v0.12.0/main.go
文件内容(注意文件复制出来后,内容与日志中的行数可能不对应):
// Copyright 2015 flannel authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"errors"
"flag"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"github.com/coreos/pkg/flagutil"
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/coreos/flannel/network"
"github.com/coreos/flannel/pkg/ip"
"github.com/coreos/flannel/subnet"
"github.com/coreos/flannel/subnet/etcdv2"
"github.com/coreos/flannel/subnet/kube"
"github.com/coreos/flannel/version"
"time"
"github.com/joho/godotenv"
"sync"
// Backends need to be imported for their init() to get executed and them to register
"github.com/coreos/flannel/backend"
_ "github.com/coreos/flannel/backend/alivpc"
_ "github.com/coreos/flannel/backend/alloc"
_ "github.com/coreos/flannel/backend/awsvpc"
_ "github.com/coreos/flannel/backend/extension"
_ "github.com/coreos/flannel/backend/gce"
_ "github.com/coreos/flannel/backend/hostgw"
_ "github.com/coreos/flannel/backend/ipip"
_ "github.com/coreos/flannel/backend/ipsec"
_ "github.com/coreos/flannel/backend/udp"
_ "github.com/coreos/flannel/backend/vxlan"
"github.com/coreos/go-systemd/daemon"
)
type flagSlice []string
func (t *flagSlice) String() string {
return fmt.Sprintf("%v", *t)
}
func (t *flagSlice) Set(val string) error {
*t = append(*t, val)
return nil
}
type CmdLineOpts struct {
etcdEndpoints string
etcdPrefix string
etcdKeyfile string
etcdCertfile string
etcdCAFile string
etcdUsername string
etcdPassword string
help bool
version bool
kubeSubnetMgr bool
kubeApiUrl string
kubeAnnotationPrefix string
kubeConfigFile string
iface flagSlice
ifaceRegex flagSlice
ipMasq bool
subnetFile string
subnetDir string
publicIP string
subnetLeaseRenewMargin int
healthzIP string
healthzPort int
charonExecutablePath string
charonViciUri string
iptablesResyncSeconds int
iptablesForwardRules bool
netConfPath string
}
var (
opts CmdLineOpts
errInterrupted = errors.New("interrupted")
errCanceled = errors.New("canceled")
flannelFlags = flag.NewFlagSet("flannel", flag.ExitOnError)
)
func init() {
flannelFlags.StringVar(&opts.etcdEndpoints, "etcd-endpoints", "http://127.0.0.1:4001,http://127.0.0.1:2379", "a comma-delimited list of etcd endpoints")
flannelFlags.StringVar(&opts.etcdPrefix, "etcd-prefix", "/coreos.com/network", "etcd prefix")
flannelFlags.StringVar(&opts.etcdKeyfile, "etcd-keyfile", "", "SSL key file used to secure etcd communication")
flannelFlags.StringVar(&opts.etcdCertfile, "etcd-certfile", "", "SSL certification file used to secure etcd communication")
flannelFlags.StringVar(&opts.etcdCAFile, "etcd-cafile", "", "SSL Certificate Authority file used to secure etcd communication")
flannelFlags.StringVar(&opts.etcdUsername, "etcd-username", "", "username for BasicAuth to etcd")
flannelFlags.StringVar(&opts.etcdPassword, "etcd-password", "", "password for BasicAuth to etcd")
flannelFlags.Var(&opts.iface, "iface", "interface to use (IP or name) for inter-host communication. Can be specified multiple times to check each option in order. Returns the first match found.")
flannelFlags.Var(&opts.ifaceRegex, "iface-regex", "regex expression to match the first interface to use (IP or name) for inter-host communication. Can be specified multiple times to check each regex in order. Returns the first match found. Regexes are checked after specific interfaces specified by the iface option have already been checked.")
flannelFlags.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
flannelFlags.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
flannelFlags.IntVar(&opts.subnetLeaseRenewMargin, "subnet-lease-renew-margin", 60, "subnet lease renewal margin, in minutes, ranging from 1 to 1439")
flannelFlags.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
flannelFlags.BoolVar(&opts.kubeSubnetMgr, "kube-subnet-mgr", false, "contact the Kubernetes API for subnet assignment instead of etcd.")
flannelFlags.StringVar(&opts.kubeApiUrl, "kube-api-url", "", "Kubernetes API server URL. Does not need to be specified if flannel is running in a pod.")
flannelFlags.StringVar(&opts.kubeAnnotationPrefix, "kube-annotation-prefix", "flannel.alpha.coreos.com", `Kubernetes annotation prefix. Can contain single slash "/", otherwise it will be appended at the end.`)
flannelFlags.StringVar(&opts.kubeConfigFile, "kubeconfig-file", "", "kubeconfig file location. Does not need to be specified if flannel is running in a pod.")
flannelFlags.BoolVar(&opts.version, "version", false, "print version and exit")
flannelFlags.StringVar(&opts.healthzIP, "healthz-ip", "0.0.0.0", "the IP address for healthz server to listen")
flannelFlags.IntVar(&opts.healthzPort, "healthz-port", 0, "the port for healthz server to listen(0 to disable)")
flannelFlags.IntVar(&opts.iptablesResyncSeconds, "iptables-resync", 5, "resync period for iptables rules, in seconds")
flannelFlags.BoolVar(&opts.iptablesForwardRules, "iptables-forward-rules", true, "add default accept rules to FORWARD chain in iptables")
flannelFlags.StringVar(&opts.netConfPath, "net-config-path", "/etc/kube-flannel/net-conf.json", "path to the network configuration file")
// glog will log to tmp files by default. override so all entries
// can flow into journald (if running under systemd)
flag.Set("logtostderr", "true")
// Only copy the non file logging options from glog
copyFlag("v")
copyFlag("vmodule")
copyFlag("log_backtrace_at")
// Define the usage function
flannelFlags.Usage = usage
// now parse command line args
flannelFlags.Parse(os.Args[1:])
}
func copyFlag(name string) {
flannelFlags.Var(flag.Lookup(name).Value, flag.Lookup(name).Name, flag.Lookup(name).Usage)
}
func usage() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTION]...\n", os.Args[0])
flannelFlags.PrintDefaults()
os.Exit(0)
}
func newSubnetManager() (subnet.Manager, error) {
if opts.kubeSubnetMgr {
return kube.NewSubnetManager(opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath)
}
cfg := &etcdv2.EtcdConfig{
Endpoints: strings.Split(opts.etcdEndpoints, ","),
Keyfile: opts.etcdKeyfile,
Certfile: opts.etcdCertfile,
CAFile: opts.etcdCAFile,
Prefix: opts.etcdPrefix,
Username: opts.etcdUsername,
Password: opts.etcdPassword,
}
// Attempt to renew the lease for the subnet specified in the subnetFile
prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")
return etcdv2.NewLocalManager(cfg, prevSubnet)
}
func main() {
if opts.version {
fmt.Fprintln(os.Stderr, version.Version)
os.Exit(0)
}
flagutil.SetFlagsFromEnv(flannelFlags, "FLANNELD")
// Validate flags
if opts.subnetLeaseRenewMargin >= 24*60 || opts.subnetLeaseRenewMargin <= 0 {
log.Error("Invalid subnet-lease-renew-margin option, out of acceptable range")
os.Exit(1)
}
// Work out which interface to use
var extIface *backend.ExternalInterface
var err error
// Check the default interface only if no interfaces are specified
if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 {
extIface, err = LookupExtIface(opts.publicIP, "")
if err != nil {
log.Error("Failed to find any valid interface to use: ", err)
os.Exit(1)
}
} else {
// Check explicitly specified interfaces
for _, iface := range opts.iface {
extIface, err = LookupExtIface(iface, "")
if err != nil {
log.Infof("Could not find valid interface matching %s: %s", iface, err)
}
if extIface != nil {
break
}
}
// Check interfaces that match any specified regexes
if extIface == nil {
for _, ifaceRegex := range opts.ifaceRegex {
extIface, err = LookupExtIface("", ifaceRegex)
if err != nil {
log.Infof("Could not find valid interface matching %s: %s", ifaceRegex, err)
}
if extIface != nil {
break
}
}
}
if extIface == nil {
// Exit if any of the specified interfaces do not match
log.Error("Failed to find interface to use that matches the interfaces and/or regexes provided")
os.Exit(1)
}
}
sm, err := newSubnetManager()
if err != nil {
log.Error("Failed to create SubnetManager: ", err)
os.Exit(1)
}
log.Infof("Created subnet manager: %s", sm.Name())
// Register for SIGINT and SIGTERM
log.Info("Installing signal handlers")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
// This is the main context that everything should run in.
// All spawned goroutines should exit when cancel is called on this context.
// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
// blocking and returning only when cancel() is called.
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
shutdownHandler(ctx, sigs, cancel)
wg.Done()
}()
if opts.healthzPort > 0 {
// It's not super easy to shutdown the HTTP server so don't attempt to stop it cleanly
go mustRunHealthz()
}
// Fetch the network config (i.e. what backend to use etc..).
config, err := getConfig(ctx, sm)
if err == errCanceled {
wg.Wait()
os.Exit(0)
}
// Create a backend manager then use it to create the backend and register the network with it.
bm := backend.NewManager(ctx, sm, extIface)
be, err := bm.GetBackend(config.BackendType)
if err != nil {
log.Errorf("Error fetching backend: %s", err)
cancel()
wg.Wait()
os.Exit(1)
}
bn, err := be.RegisterNetwork(ctx, wg, config)
if err != nil {
log.Errorf("Error registering network: %s", err)
cancel()
wg.Wait()
os.Exit(1)
}
// Set up ipMasq if needed
if opts.ipMasq {
if err = recycleIPTables(config.Network, bn.Lease()); err != nil {
log.Errorf("Failed to recycle IPTables rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Setting up masking rules")
go network.SetupAndEnsureIPTables(network.MasqRules(config.Network, bn.Lease()), opts.iptablesResyncSeconds)
}
// Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts)
// In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
// In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
if opts.iptablesForwardRules {
log.Infof("Changing default FORWARD chain policy to ACCEPT")
go network.SetupAndEnsureIPTables(network.ForwardRules(config.Network.String()), opts.iptablesResyncSeconds)
}
if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
// Continue, even though it failed.
log.Warningf("Failed to write subnet file: %s", err)
} else {
log.Infof("Wrote subnet file to %s", opts.subnetFile)
}
// Start "Running" the backend network. This will block until the context is done so run in another goroutine.
log.Info("Running backend.")
wg.Add(1)
go func() {
bn.Run(ctx)
wg.Done()
}()
daemon.SdNotify(false, "READY=1")
// Kube subnet mgr doesn't lease the subnet for this node - it just uses the podCidr that's already assigned.
if !opts.kubeSubnetMgr {
err = MonitorLease(ctx, sm, bn, &wg)
if err == errInterrupted {
// The lease was "revoked" - shut everything down
cancel()
}
}
log.Info("Waiting for all goroutines to exit")
// Block waiting for all the goroutines to finish.
wg.Wait()
log.Info("Exiting cleanly...")
os.Exit(0)
}
func recycleIPTables(nw ip.IP4Net, lease *subnet.Lease) error {
prevNetwork := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK")
prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork != nw && prevSubnet != lease.Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules", nw, lease.Subnet, prevNetwork, prevSubnet)
lease := &subnet.Lease{
Subnet: prevSubnet,
}
if err := network.DeleteIPTables(network.MasqRules(prevNetwork, lease)); err != nil {
return err
}
}
return nil
}
func shutdownHandler(ctx context.Context, sigs chan os.Signal, cancel context.CancelFunc) {
// Wait for the context do be Done or for the signal to come in to shutdown.
select {
case <-ctx.Done():
log.Info("Stopping shutdownHandler...")
case <-sigs:
// Call cancel on the context to close everything down.
cancel()
log.Info("shutdownHandler sent cancel signal...")
}
// Unregister to get default OS nuke behaviour in case we don't exit cleanly
signal.Stop(sigs)
}
func getConfig(ctx context.Context, sm subnet.Manager) (*subnet.Config, error) {
// Retry every second until it succeeds
for {
config, err := sm.GetNetworkConfig(ctx)
if err != nil {
log.Errorf("Couldn't fetch network config: %s", err)
} else if config == nil {
log.Warningf("Couldn't find network config: %s", err)
} else {
log.Infof("Found network config - Backend type: %s", config.BackendType)
return config, nil
}
select {
case <-ctx.Done():
return nil, errCanceled
case <-time.After(1 * time.Second):
fmt.Println("timed out")
}
}
}
func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg *sync.WaitGroup) error {
// Use the subnet manager to start watching leases.
evts := make(chan subnet.Event)
wg.Add(1)
go func() {
subnet.WatchLease(ctx, sm, bn.Lease().Subnet, evts)
wg.Done()
}()
renewMargin := time.Duration(opts.subnetLeaseRenewMargin) * time.Minute
dur := bn.Lease().Expiration.Sub(time.Now()) - renewMargin
for {
select {
case <-time.After(dur):
err := sm.RenewLease(ctx, bn.Lease())
if err != nil {
log.Error("Error renewing lease (trying again in 1 min): ", err)
dur = time.Minute
continue
}
log.Info("Lease renewed, new expiration: ", bn.Lease().Expiration)
dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin
case e := <-evts:
switch e.Type {
case subnet.EventAdded:
bn.Lease().Expiration = e.Lease.Expiration
dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin
log.Infof("Waiting for %s to renew lease", dur)
case subnet.EventRemoved:
log.Error("Lease has been revoked. Shutting down daemon.")
return errInterrupted
}
case <-ctx.Done():
log.Infof("Stopped monitoring lease")
return errCanceled
}
}
}
func LookupExtIface(ifname string, ifregex string) (*backend.ExternalInterface, error) {
var iface *net.Interface
var ifaceAddr net.IP
var err error
if len(ifname) > 0 {
if ifaceAddr = net.ParseIP(ifname); ifaceAddr != nil {
log.Infof("Searching for interface using %s", ifaceAddr)
iface, err = ip.GetInterfaceByIP(ifaceAddr)
if err != nil {
return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
}
} else {
iface, err = net.InterfaceByName(ifname)
if err != nil {
return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
}
}
} else if len(ifregex) > 0 {
// Use the regex if specified and the iface option for matching a specific ip or name is not used
ifaces, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("error listing all interfaces: %s", err)
}
// Check IP
for _, ifaceToMatch := range ifaces {
ifaceIP, err := ip.GetIfaceIP4Addr(&ifaceToMatch)
if err != nil {
// Skip if there is no IPv4 address
continue
}
matched, err := regexp.MatchString(ifregex, ifaceIP.String())
if err != nil {
return nil, fmt.Errorf("regex error matching pattern %s to %s", ifregex, ifaceIP.String())
}
if matched {
ifaceAddr = ifaceIP
iface = &ifaceToMatch
break
}
}
// Check Name
if iface == nil && ifaceAddr == nil {
for _, ifaceToMatch := range ifaces {
matched, err := regexp.MatchString(ifregex, ifaceToMatch.Name)
if err != nil {
return nil, fmt.Errorf("regex error matching pattern %s to %s", ifregex, ifaceToMatch.Name)
}
if matched {
iface = &ifaceToMatch
break
}
}
}
// Check that nothing was matched
if iface == nil {
var availableFaces []string
for _, f := range ifaces {
ip, _ := ip.GetIfaceIP4Addr(&f) // We can safely ignore errors. We just won't log any ip
availableFaces = append(availableFaces, fmt.Sprintf("%s:%s", f.Name, ip))
}
return nil, fmt.Errorf("Could not match pattern %s to any of the available network interfaces (%s)", ifregex, strings.Join(availableFaces, ", "))
}
} else {
log.Info("Determining IP address of default interface")
if iface, err = ip.GetDefaultGatewayIface(); err != nil {
return nil, fmt.Errorf("failed to get default interface: %s", err)
}
}
if ifaceAddr == nil {
ifaceAddr, err = ip.GetIfaceIP4Addr(iface)
if err != nil {
return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
}
}
log.Infof("Using interface with name %s and address %s", iface.Name, ifaceAddr)
if iface.MTU == 0 {
return nil, fmt.Errorf("failed to determine MTU for %s interface", ifaceAddr)
}
var extAddr net.IP
if len(opts.publicIP) > 0 {
extAddr = net.ParseIP(opts.publicIP)
if extAddr == nil {
return nil, fmt.Errorf("invalid public IP address: %s", opts.publicIP)
}
log.Infof("Using %s as external address", extAddr)
}
if extAddr == nil {
log.Infof("Defaulting external address to interface address (%s)", ifaceAddr)
extAddr = ifaceAddr
}
return &backend.ExternalInterface{
Iface: iface,
IfaceAddr: ifaceAddr,
ExtAddr: extAddr,
}, nil
}
func WriteSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
dir, name := filepath.Split(path)
os.MkdirAll(dir, 0755)
tempFile := filepath.Join(dir, "."+name)
f, err := os.Create(tempFile)
if err != nil {
return err
}
// Write out the first usable IP by incrementing
// sn.IP by one
sn := bn.Lease().Subnet
sn.IP += 1
fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
_, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
f.Close()
if err != nil {
return err
}
// rename(2) the temporary file to the desired location so that it becomes
// atomically visible with the contents
return os.Rename(tempFile, path)
//TODO - is this safe? What if it's not on the same FS?
}
func mustRunHealthz() {
address := net.JoinHostPort(opts.healthzIP, strconv.Itoa(opts.healthzPort))
log.Infof("Start healthz server on %s", address)
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("flanneld is running"))
})
if err := http.ListenAndServe(address, nil); err != nil {
log.Errorf("Start healthz server error. %v", err)
panic(err)
}
}
func ReadCIDRFromSubnetFile(path string, CIDRKey string) ip.IP4Net {
var prevCIDR ip.IP4Net
if _, err := os.Stat(path); !os.IsNotExist(err) {
prevSubnetVals, err := godotenv.Read(path)
if err != nil {
log.Errorf("Couldn't fetch previous %s from subnet file at %s: %s", CIDRKey, path, err)
} else if prevCIDRString, ok := prevSubnetVals[CIDRKey]; ok {
err = prevCIDR.UnmarshalJSON([]byte(prevCIDRString))
if err != nil {
log.Errorf("Couldn't parse previous %s from subnet file at %s: %s", CIDRKey, path, err)
}
}
}
return prevCIDR
}