Partage de technologie

56. Explication détaillée de la configuration de l'exécution de la gestion de Flink DataStream

2024-07-08

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Vue d'ensemble
1. Exécuter la configuration

StreamExecutionEnvironment ContientExecutionConfig, qui permet de définir des valeurs de configuration spécifiques au travail au moment de l'exécution.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

Voici les options de configuration disponibles : (gras par défaut)

  • setClosureCleanerLevel() . Le niveau de nettoyage des fermetures est réglé par défaut surClosureCleanerLevel.RECURSIVE . Le nettoyeur de fermeture supprime les références inutiles aux classes d'appel de fonctions anonymes dans le programme Flink. Après avoir désactivé le nettoyeur de fermeture, la fonction anonyme de l'utilisateur peut faire référence à une classe appelante non sérialisable. Cela entraînera le sérialiseur à lever une exception. Les valeurs paramétrables sont :NONE: désactiver complètement le nettoyeur de fermeture,TOP_LEVEL : Nettoyer uniquement les classes de niveau supérieur sans récursion dans les champs,RECURSIVE: Nettoyer tous les champs de manière récursive.
  • getParallelism() / setParallelism(int parallelism) . Définit le degré de parallélisme par défaut pour les tâches.
  • getMaxParallelism() / setMaxParallelism(int parallelism) . Définit le parallélisme maximum par défaut pour les tâches. Ce paramètre détermine le degré maximum de parallélisme et spécifie une limite supérieure pour la mise à l'échelle dynamique.
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries) . Définissez le nombre de fois où les tâches ayant échoué sont réexécutées. Une valeur de zéro désactive effectivement la tolérance aux pannes.-1 Indique l'utilisation des valeurs par défaut du système (définies dans la configuration).Cette configuration est obsolète, veuillez plutôt utiliser la stratégie de redémarrage.
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) . Définit le délai (en millisecondes) que le système attend avant de réexécuter une tâche après son échec. Une fois que toutes les tâches ont été arrêtées avec succès sur les TaskManagers, le délai est calculé et une fois le délai écoulé, les tâches sont redémarrées.Cette configuration est obsolète, veuillez plutôt utiliser la stratégie de redémarrage.
  • getExecutionMode() / setExecutionMode() . Le mode d'exécution par défaut est PIPELINED. Définissez le mode d'exécution pour exécuter le programme. Le mode d'exécution définit si l'échange de données s'effectue en mode batch ou streaming.
  • enableForceKryo() / disableForceKryo . Kryo n'est pas obligé d'être utilisé par défaut. Force GenericTypeInformation à utiliser le sérialiseur Kryo pour les POJO même s'ils peuvent être analysés en tant que POJO. Dans certains cas, il est préférable d'activer cette configuration. Par exemple, lorsque le sérialiseur interne de Flink ne parvient pas à gérer correctement les POJO.
  • enableForceAvro() / disableForceAvro() . Avro n'est pas obligé d'être utilisé par défaut. Forcez Flink AvroTypeInfo à utiliser le sérialiseur Avro au lieu de Kryo pour sérialiser les POJO Avro.
  • enableObjectReuse() / disableObjectReuse() . Par défaut, les objets ne sont pas réutilisés dans Flink. L'activation du mode de réutilisation des objets demande au runtime de réutiliser les objets utilisateur pour de meilleures performances. Notez que cela peut provoquer des bugs.
  • getGlobalJobParameters() / setGlobalJobParameters() . Cette méthode permet à l'utilisateur de définir un objet personnalisé dans la configuration globale du travail.parce queExecutionConfig Accessible dans toutes les fonctions définies par l'utilisateur, il s'agit donc d'un moyen simple de rendre les configurations disponibles à l'échelle mondiale pour toutes les tâches.
  • addDefaultKryoSerializer(Class type, Serializer serializer) . Enregistre une instance de sérialiseur Kryo pour le type spécifié.
  • addDefaultKryoSerializer(Class type, Class