from enum import Enum from functools import reduce from scipy import integrate import itertools DELIMETER = "," class FuzzDistance(Enum): Short = 0 Medium = 1 Long = 2 def membership_grade(self, distance: float) -> float: k = 1 / 50 fns = { FuzzDistance.Short: lambda t: -k * (t - 50), FuzzDistance.Medium: lambda t: min(k * (t - 0), -k * (t - 100)), FuzzDistance.Long: lambda t: min(k * (t - 50), 1), } return max(0, fns[self](distance)) class FuzzVelocity(Enum): Low = 0 Medium = 1 High = 2 def membership_grade(self, v: float) -> float: k = 1 / 50 fns = { FuzzVelocity.Low: lambda t: -k * (t - 50), FuzzVelocity.Medium: lambda t: min(k * (t - 0), -k * (t - 100)), FuzzVelocity.High: lambda t: k * (t - 50), } return max(0, fns[self](v)) class FuzzThreat(Enum): VL = 0 L = 1 M = 2 H = 3 VH = 4 def reverse_membership_grade(self) -> float: v = [0, 0.25, 0.5, 0.75, 1] return v[self.value] def membership_grade(self, threat: float) -> float: k = 1 / 0.25 fns = { FuzzThreat.VL: lambda t: -k * (t - 0.25), FuzzThreat.L: lambda t: min(k * (t - 0), -k * (t - 0.5)), FuzzThreat.M: lambda t: min(k * (t - 0.25), -k * (t - 0.75)), FuzzThreat.H: lambda t: min(k * (t - 0.5), -k * (t - 1)), FuzzThreat.VH: lambda t: k * (t - 0.75), } return max(0, fns[self](threat)) class Rule: def __init__( self, dirtness: FuzzVelocity, type_of_dirt: FuzzDistance, wash_time: FuzzThreat ) -> None: self.dirtness = dirtness self.type_of_dirt = type_of_dirt self.wash_time = wash_time def rule_support(self, dirtness: float, type_of_dirt: float) -> float: # 计算隶属度 dirtness_grade = self.dirtness.membership_grade(dirtness) type_of_dirt_grade = self.type_of_dirt.membership_grade(type_of_dirt) # 计算t范数 min return min(dirtness_grade, type_of_dirt_grade) class InferenceEngine: def __init__(self, rules: list[Rule]) -> None: self.rules = rules def output( self, dirtness: float, type_of_dirt: float ) -> list[tuple[float, FuzzThreat]]: # 使用规则 ans = map( lambda rule: (rule.rule_support(dirtness, type_of_dirt), rule.wash_time), self.rules, ) ans = list(ans) return ans # 聚合模糊输出,使用离散的质心计算方式,数学积 def aggregate_discret_AP(pairs: list[tuple[float, FuzzThreat]]) -> float: return sum( map(lambda pair: pair[0] * pair[1].reverse_membership_grade(), pairs) ) / sum(map(lambda pair: pair[0], pairs)) # 使用连续的质心计算方式,数学积和代数和 def aggregate_continue_AP(pairs: list[tuple[float, FuzzThreat]]) -> float: def f(time: float) -> float: return S_AS(map(lambda pair: pair[0] * pair[1].membership_grade(time), pairs)) base = integrate.quad(f, 0, 1)[0] return integrate.quad(lambda v: v * f(v), 0, 1)[0] / base def S_AS(iter) -> float: "代数和,输入数组" ans = 0 iter = tuple(iter) for i in range(len(iter)): n = i + 1 if n & 1: flag = 1 else: flag = -1 # 选择n个数,相乘再相加 for collection in itertools.combinations(iter, n): ans += flag * reduce(lambda x, y: x * y, collection) return ans # 标准交 def aggregate_continue_SI(pairs: list[tuple[float, FuzzThreat]]) -> float: """lambda: fi=min(alpha, mu), f=max(fi)""" def f(time: float) -> float: return max( map(lambda pair: min(pair[0], pair[1].membership_grade(time)), pairs) ) return integrate.quad(lambda v: v * f(v), 0, 1)[0] / integrate.quad(f, 0, 1)[0] rules = [ Rule(FuzzVelocity.Low, FuzzDistance.Short, FuzzThreat.H), Rule(FuzzVelocity.Medium, FuzzDistance.Short, FuzzThreat.H), Rule(FuzzVelocity.High, FuzzDistance.Short, FuzzThreat.VH), Rule(FuzzVelocity.Low, FuzzDistance.Medium, FuzzThreat.M), Rule(FuzzVelocity.Medium, FuzzDistance.Medium, FuzzThreat.M), Rule(FuzzVelocity.High, FuzzDistance.Medium, FuzzThreat.VH), Rule(FuzzVelocity.Low, FuzzDistance.Long, FuzzThreat.VL), Rule(FuzzVelocity.Medium, FuzzDistance.Long, FuzzThreat.L), Rule(FuzzVelocity.High, FuzzDistance.Long, FuzzThreat.M), ] inputs = [ [0, 0], [10, 10], [50, 10], [10, 50], [50, 50], [10, 70], [50, 70], [70, 10], [70, 50], [70, 70], [100, 100], ] def main(inputs, rules): print(DELIMETER.join(["敌机速度", "敌机距离", "威胁"])) engine = InferenceEngine(rules) for input in inputs: # print("脏的程度", input[0], "脏的类型", input[1]) outputs = engine.output(input[0], input[1]) output = aggregate_discret_AP(outputs) output1 = aggregate_continue_AP(outputs) output2 = aggregate_continue_SI(outputs) print( DELIMETER.join(["{:.2f}"] * 5).format( input[0], input[1], output, output1, output2 ) ) if __name__ == "__main__": main(inputs, rules)