#!/usr/bin/env python """ Detect outdated shared libraries. Detect and report not up-to-date shared libraries that used by running processes. Detection based on BuildID comparison and aware of deleted or replaced files. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = 'Rinat Sabitov' __copyright__ = "Copyright (c) Cloud Linux GmbH & Cloud Linux Software, Inc" __license__ = "GPLv2" __maintainer__ = 'Rinat Sabitov' __email__ = 'rsabitov@cloudlinux.com' __status__ = 'Beta' __version__ = '0.1' import os import re import json import struct import logging import subprocess from collections import namedtuple ELF64_HEADER = "<16sHHIQQQIHHHHHH" ELF_PH_HEADER = " /dev/null 2>&1') != 0: logging.debug("Libcare service is not running.") return result try: std_out = check_output([LIBCARE_CLIENT, 'info', '-j']) for line in std_out.splitlines(): try: item = json.loads(line) for v in item.values(): if isinstance(v, dict) and 'buildid' in v: result.add((item['pid'], v['buildid'])) except ValueError as e: logging.debug("Can't parse `%s`: %s", line, e) except Exception as e: logging.debug("Can't read libcare info: %s", e) return result def cached(clbl): data = {} def wrapper(*args): if args not in data: data[args] = clbl(*args) return data[args] wrapper.clear = data.clear return wrapper @cached def get_dist_data(dist): userspace_data = json.load(urlopen(USERSPACE_JSON)) for dist_re, dist_data in userspace_data.items(): if re.match(dist_re, dist): logging.debug("Distro `%s` was matched by `%s`", dist, dist_re) # Handle references if 'ref-' in dist_data: logging.debug("Distro reference detected: `%s`", dist_data) dist_data = userspace_data.get(dist_data[4:]) or {} return dist_data return {} class NotAnELFException(Exception): pass class BuildIDParsingException(Exception): pass def get_build_id(fileobj): try: header = fileobj.read(struct.calcsize(ELF64_HEADER)) hdr = struct.unpack(ELF64_HEADER, header) except struct.error as err: # Can't read ELF header raise NotAnELFException("Can't read header: {0}".format(err)) (e_ident, e_type, e_machine, e_version, e_entry, e_phoff, e_shoff, e_flags, e_ehsize, e_phentsize, e_phnum, e_shentsize, e_shnum, e_shstrndx) = hdr # Not an ELF file if not e_ident.startswith(b'\x7fELF\x02\x01'): raise NotAnELFException("Wrong header") # No program headers if not e_phoff: raise BuildIDParsingException("Program headers not found.") logging.debug("e_phoff: %d, e_phnum: %d, e_phentsize: %d", e_phoff, e_phnum, e_phentsize) fileobj.seek(e_phoff) for idx in range(e_phnum): ph = fileobj.read(e_phentsize) (p_type, p_flags, p_offset, p_vaddr, p_paddr, p_filesz, p_memsz, p_align) = struct.unpack(ELF_PH_HEADER, ph) logging.debug("p_idx: %d, p_type: %d", idx, p_type) if p_type == PT_NOTE: logging.debug("p_offset: %d, p_filesz: %d", p_offset, p_filesz) p_end = p_offset + p_filesz fileobj.seek(p_offset) n_type = None while n_type not in (NT_GNU_BUILD_ID, NT_GO_BUILD_ID) and fileobj.tell() <= p_end: nhdr = fileobj.read(struct.calcsize(ELF_NHDR)) n_namesz, n_descsz, n_type = struct.unpack(ELF_NHDR, nhdr) # 4-byte align if n_namesz % 4: n_namesz = ((n_namesz // 4) + 1) * 4 if n_descsz % 4: n_descsz = ((n_descsz // 4) + 1) * 4 logging.debug("n_type: %d, n_namesz: %d, n_descsz: %d)", n_type, n_namesz, n_descsz) fileobj.read(n_namesz) desc = struct.unpack("<{0}B".format(n_descsz), fileobj.read(n_descsz)) if n_type is not None: return ''.join('{0:02x}'.format(x) for x in desc) # Nothing was found raise BuildIDParsingException("Program header PT_NOTE with NT_GNU_BUILD_ID was not found.") def iter_maps(pid): try: with open('/proc/{0:d}/maps'.format(pid), 'r') as mapfd: for line in mapfd: data = (line.split() + [None, None])[:7] yield Map(*data) except IOError as err: # Most cases of IOErrors is a lack of maps due to process exit logging.debug("Iter via `%d` map error: %s", pid, err) def get_vmas(pid, inode): result = [] for mmap in iter_maps(pid): if mmap.inode == inode: start, _, end = mmap.addr.partition('-') offset, start, end = map(lambda x: int(x, 16), [mmap.offset, start, end]) rng = Vma(offset, end - start, start, end) result.append(rng) return result def is_valid_file_mmap(mmap): return mmap.pathname \ and mmap.pathname not in IGNORED_PATHNAME \ and not mmap.pathname.startswith('anon_inode:') \ and not mmap.pathname.startswith('/dev/') def get_process_files(pid): result = set() for mmap in iter_maps(pid): if is_valid_file_mmap(mmap): pathname, _, _ = mmap.pathname.partition(';') result.add((pathname, mmap.inode)) return result class FileMMapped(object): def __init__(self, pid, inode): self.fileobj = open('/proc/{0:d}/mem'.format(pid), 'rb') self.vmas = get_vmas(pid, inode) self.pos = 0 self.fileobj.seek(self._get_vma(0).start) def _get_vma(self, offset): for rng in self.vmas: if rng.offset <= offset < rng.offset + rng.size: return rng raise IOError("Offset {0} is not in ranges {1}".format(offset, self.vmas)) def tell(self): return self.pos def __enter__(self): return self def __exit__(self, type, value, traceback): self.fileobj.close() def close(self): self.fileobj.close() def seek(self, offset, whence=0): rng = self._get_vma(offset) addr = rng.start + (offset - rng.offset) self.fileobj.seek(addr, whence) self.pos = offset def read(self, size): result = self.fileobj.read(size) self.pos += len(result) return result open_mmapped = FileMMapped def get_comm(pid): comm_filename = '/proc/{0:d}/comm'.format(pid) with open(comm_filename, 'r') as fd: return fd.read().strip() def iter_pids(): for pid in os.listdir('/proc/'): try: yield int(pid) except ValueError: pass def iter_proc_map(): for pid in iter_pids(): for pathname, inode in get_process_files(pid): yield pid, inode, pathname def get_fileobj(pid, inode, pathname): logging.debug("path: %s", pathname) # If mapped file exists and has the same inode if os.path.isfile(pathname) and os.stat(pathname).st_ino == int(inode): fileobj = open(pathname, 'rb') # If file exists only as a mapped to the memory else: fileobj = open_mmapped(pid, inode) logging.warning("Library `%s` was gathered from memory.", pathname) return fileobj def iter_proc_lib(): cache = {} for pid, inode, pathname in iter_proc_map(): if inode not in cache: try: with get_fileobj(pid, inode, pathname) as fileobj: cache[inode] = get_build_id(fileobj) except (NotAnELFException, BuildIDParsingException, IOError) as err: logging.info("Can't read buildID from {0}: {1}".format(pathname, repr(err))) cache[inode] = None except Exception as err: logging.error("Can't read buildID from {0}: {1}".format(pathname, repr(err))) cache[inode] = None build_id = cache[inode] yield pid, os.path.basename(pathname), build_id @cached def get_kcplus_data(): return set(json.load(urlopen(KCARE_PLUS_JSON)).keys()) def is_kcplus_handled(build_id): return build_id in get_kcplus_data() def is_up_to_date(libname, build_id, dist): subset = get_dist_data(dist).get(libname, {}) if not subset: logging.warning('No data for %s/%s.', dist, libname) return (not subset) or (build_id in subset) def main(): failed = False dist = get_dist() logging.info("Distro detected: %s", dist) if not get_dist_data(dist): logging.error("Distro `%s` is not supported", dist) exit(1) patched_data = get_patched_data() for pid, libname, build_id in iter_proc_lib(): comm = get_comm(pid) logging.info("For %s[%s] `%s` was found with buid id = %s", comm, pid, libname, build_id) if build_id and (pid, build_id) not in patched_data\ and not is_up_to_date(libname, build_id, dist): failed = True logging.error( "[%s] Process %s[%d] linked to the `%s` that is not up to date.", "*" if is_kcplus_handled(build_id) else " ", comm, pid, libname) if not failed: print("It looks OK. We didn't find any outdated libraries.") else: print("\nYou may want to update libraries above and restart " "corresponding processes.\n\n KernelCare+ allows to resolve " "such issues with no process downtime. To find out more, please," " visit https://tuxcare.com/enterprise-live-patching-services/libcare/") return 0 if not failed else 1 if __name__ == '__main__': logging.basicConfig(level=LOGLEVEL, format='%(message)s') exit(main())