Technologieaustausch

56. Detaillierte Erläuterung der Verwaltungsausführungskonfiguration von Flink DataStream

2024-07-08

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Übersicht
1. Konfiguration durchführen

StreamExecutionEnvironment EnthältExecutionConfig, wodurch auftragsspezifische Konfigurationswerte zur Laufzeit festgelegt werden können.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

Im Folgenden sind die verfügbaren Konfigurationsoptionen aufgeführt: (standardmäßig fett)

  • setClosureCleanerLevel() . Die Verschlussreinigungsstufe ist standardmäßig auf eingestelltClosureCleanerLevel.RECURSIVE . Der Closing Cleaner entfernt unnötige Verweise auf anonyme Funktionsaufrufklassen im Flink-Programm. Nach dem Deaktivieren des Verschlussreinigers verweist die anonyme Funktion des Benutzers möglicherweise auf eine nicht serialisierbare Aufrufklasse. Dies führt dazu, dass der Serialisierer eine Ausnahme auslöst. Die einstellbaren Werte sind:NONE: Verschlussreiniger komplett deaktivieren,TOP_LEVEL: Nur Klassen der obersten Ebene bereinigen, ohne in Felder zu rekursieren.RECURSIVE: Alle Felder rekursiv bereinigen.
  • getParallelism() / setParallelism(int parallelism) . Legt den Standardgrad der Parallelität für Jobs fest.
  • getMaxParallelism() / setMaxParallelism(int parallelism) . Legt die standardmäßige maximale Parallelität für Jobs fest. Diese Einstellung bestimmt den maximalen Grad der Parallelität und gibt eine Obergrenze für die dynamische Skalierung an.
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries) . Legen Sie fest, wie oft fehlgeschlagene Aufgaben erneut ausgeführt werden. Ein Wert von Null deaktiviert effektiv die Fehlertoleranz.-1 Zeigt die Verwendung von Systemstandards an (in der Konfiguration definiert).Diese Konfiguration ist veraltet. Bitte verwenden Sie stattdessen die Neustartstrategie.
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) . Legt die Verzögerung (in Millisekunden) fest, die das System wartet, bevor ein Auftrag nach einem Fehlschlag erneut ausgeführt wird. Nachdem alle Aufgaben auf TaskManagern erfolgreich gestoppt wurden, wird die Verzögerung berechnet und nach Ablauf der Verzögerung werden die Aufgaben neu gestartet.Diese Konfiguration ist veraltet. Bitte verwenden Sie stattdessen die Neustartstrategie.
  • getExecutionMode() / setExecutionMode() . Der Standardausführungsmodus ist PIPELINED. Legen Sie den Ausführungsmodus fest, um das Programm auszuführen. Der Ausführungsmodus legt fest, ob der Datenaustausch im Batch- oder Streaming-Modus erfolgt.
  • enableForceKryo() / disableForceKryo . Kryo muss nicht standardmäßig verwendet werden. Erzwingt, dass GenericTypeInformation den Kryo-Serializer für POJOs verwendet, obwohl diese als POJOs analysiert werden können. In manchen Fällen sollte die Aktivierung dieser Konfiguration bevorzugt werden. Zum Beispiel, wenn der interne Serialisierer von Flink POJOs nicht richtig verarbeiten kann.
  • enableForceAvro() / disableForceAvro() . Avro muss nicht standardmäßig verwendet werden. Erzwingen Sie, dass Flink AvroTypeInfo den Avro-Serializer anstelle von Kryo verwendet, um Avro-POJOs zu serialisieren.
  • enableObjectReuse() / disableObjectReuse() . Standardmäßig werden Objekte in Flink nicht wiederverwendet. Durch die Aktivierung des Objektwiederverwendungsmodus wird die Laufzeit angewiesen, Benutzerobjekte für eine bessere Leistung wiederzuverwenden. Beachten Sie, dass dies zu Fehlern führen kann.
  • getGlobalJobParameters() / setGlobalJobParameters() . Mit dieser Methode kann der Benutzer ein benutzerdefiniertes Objekt als globale Konfiguration des Jobs festlegen.WeilExecutionConfig In allen benutzerdefinierten Funktionen zugänglich, daher ist dies eine einfache Möglichkeit, Konfigurationen global und auftragsübergreifend verfügbar zu machen.
  • addDefaultKryoSerializer(Class type, Serializer serializer) . Registriert eine Kryo-Serializer-Instanz für den angegebenen Typ.
  • addDefaultKryoSerializer(Class type, Class