aboutsummaryrefslogtreecommitdiff
blob: 16e29cab1d3895febfacd5ee335fe7871113e52c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os

from snakeoil.osutils import pjoin
from snakeoil.test import TestCase
from snakeoil.test.mixins import tempdir_decorator

from pkgcore.fs import livefs
from pkgcore.fs.contents import contentsSet
from pkgcore.merge import engine

from ..fs.fs_util import fsDir, fsFile, fsSymlink
from .util import fake_engine


class fake_pkg:

    def __init__(self, contents, label=None):
        self.label = label
        self.contents = contents

    def __str__(self):
        return f"fake_pkg: {self.label}"


class Test_MergeEngineCsets(TestCase):

    simple_cset = list(fsFile(x) for x in ("/foon", "/usr/dar", "/blah"))
    simple_cset.extend(fsDir(x) for x in ("/usr", "/usr/lib"))
    simple_cset.append(fsSymlink("/usr/lib/blah", "../../blah"))
    simple_cset.append(fsSymlink("/broken-symlink", "dar"))
    simple_cset = contentsSet(simple_cset, mutable=False)

    kls = engine.MergeEngine

    def assertCsetEqual(self, cset1, cset2):
        if not isinstance(cset1, contentsSet):
            cset1 = contentsSet(cset1)
        if not isinstance(cset2, contentsSet):
            cset2 = contentsSet(cset2)
        self.assertEqual(cset1, cset2, reflective=False)

    def assertCsetNotEqual(self, cset1, cset2):
        if not isinstance(cset1, contentsSet):
            cset1 = contentsSet(cset1)
        if not isinstance(cset2, contentsSet):
            cset2 = contentsSet(cset2)
        self.assertNotEqual(cset1, cset2, reflective=False)

    def run_cset(self, target, engine, *args):
        return getattr(self.kls, target)(engine, engine.csets, *args)

    def test_generate_offset_cset(self):
        engine = fake_engine(csets={"new_cset":self.simple_cset},
            offset='/')
        def run(engine, cset):
            return self.run_cset('generate_offset_cset', engine,
                lambda e, c:c[cset])

        self.assertCsetEqual(self.simple_cset, run(engine, 'new_cset'))
        engine.offset = '/foon/'
        run(engine, 'new_cset')
        self.assertCsetEqual(self.simple_cset.insert_offset(engine.offset),
            run(engine, 'new_cset'))

    def test_get_pkg_contents(self):
        new_cset = self.kls.get_pkg_contents(None, None, fake_pkg(self.simple_cset))
        self.assertCsetEqual(self.simple_cset, new_cset)
        # must differ; shouldn't be modifying the original cset
        self.assertNotIdentical(self.simple_cset, new_cset)

    def test_get_remove_cset(self):
        files = contentsSet(self.simple_cset.iterfiles(invert=True))
        engine = fake_engine(csets={'install':files,
            'old_cset':self.simple_cset})
        self.assertCsetEqual(self.simple_cset.iterfiles(),
            self.run_cset('get_remove_cset', engine))

    def test_get_replace_cset(self):
        files = contentsSet(self.simple_cset.iterfiles(invert=True))
        engine = fake_engine(csets={'install':files,
            'old_cset':self.simple_cset})
        self.assertCsetEqual(files,
            self.run_cset('get_replace_cset', engine))

    @tempdir_decorator
    def test_rewrite_awareness(self):
        src = contentsSet(self.simple_cset)
        src.add(fsFile("/usr/lib/donkey"))
        trg = src.difference(["/usr/lib/donkey"])
        trg.add(fsFile("/usr/lib64/donkey"))
        trg = trg.insert_offset(self.dir)
        os.mkdir(pjoin(self.dir, 'usr'))
        os.mkdir(pjoin(self.dir, 'usr', 'lib64'))
        os.symlink('lib64', pjoin(self.dir, 'usr', 'lib'))
        pkg = fake_pkg(src)
        engine = self.kls.install(self.dir, pkg, offset=self.dir)
        result = engine.csets['resolved_install']
        self.assertEqual(sorted(result.iterfiles()), sorted(trg.iterfiles()))

    @tempdir_decorator
    def test_symlink_awareness(self):
        src = contentsSet(self.simple_cset)
        src.add(fsFile("/usr/lib/blah/donkey"))
        trg = src.difference(["/usr/lib/blah/donkey"])
        trg.add(fsFile("/blah/donkey"))
        trg = trg.insert_offset(self.dir)
        pkg = fake_pkg(src)
        engine = self.kls.install(self.dir, pkg, offset=self.dir)
        result = engine.csets['new_cset']
        self.assertEqual(sorted(result.iterfiles()), sorted(trg.iterfiles()))
    test_symlink_awareness.skip = "contentset should handle this"

    @tempdir_decorator
    def test__get_livefs_intersect_cset(self):
        old_cset = self.simple_cset.insert_offset(self.dir)
        # have to add it; scan adds the root node
        old_cset.add(fsDir(self.dir))
        os.mkdir(pjoin(self.dir, "usr"))
        open(pjoin(self.dir, "usr", "dar"), 'w').close()
        open(pjoin(self.dir, 'foon'), 'w').close()
        # note that this *is* a sym in the cset; adding this specific
        # check so that if the code differs, the test breaks, and the tests
        # get updated (additionally, folks may not be aware of the potential)
        open(pjoin(self.dir, 'broken-symlink'), 'w').close()
        engine = fake_engine(csets={'test':old_cset})
        existent = livefs.scan(self.dir)
        generated = self.run_cset('_get_livefs_intersect_cset', engine,
            'test')
        self.assertEqual(generated, existent)