Condivisione della tecnologia

56. Spiegazione dettagliata della configurazione dell'esecuzione della gestione di Flink DataStream

2024-07-08

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1. Panoramica
1. Eseguire la configurazione

StreamExecutionEnvironment ContieneExecutionConfig, che consente di impostare valori di configurazione specifici del lavoro in fase di esecuzione.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

Di seguito sono riportate le opzioni di configurazione disponibili: (grassetto per impostazione predefinita)

  • setClosureCleanerLevel() . Il livello di pulizia della chiusura è impostato per impostazione predefinita suClosureCleanerLevel.RECURSIVE . Il pulitore di chiusura rimuove i riferimenti non necessari alle classi di chiamata di funzioni anonime nel programma Flink. Dopo aver disabilitato il pulitore di chiusura, la funzione anonima dell'utente potrebbe fare riferimento ad alcune classi chiamanti non serializzabili. Ciò farà sì che il serializzatore generi un'eccezione. I valori impostabili sono:NONE: disabilita completamente il pulitore di chiusura,TOP_LEVEL: pulisce solo le classi di livello superiore senza ricorrere ai campi,RECURSIVE: pulisce tutti i campi in modo ricorsivo.
  • getParallelism() / setParallelism(int parallelism) . Imposta il grado di parallelismo predefinito per i lavori.
  • getMaxParallelism() / setMaxParallelism(int parallelism) . Imposta il parallelismo massimo predefinito per i lavori. Questa impostazione determina il grado massimo di parallelismo e specifica un limite superiore sul ridimensionamento dinamico.
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries) . Imposta il numero di volte in cui le attività non riuscite vengono rieseguite. Un valore pari a zero disabilita effettivamente la tolleranza agli errori.-1 Indica l'utilizzo delle impostazioni predefinite del sistema (definite nella configurazione).Questa configurazione è deprecata, utilizza invece la strategia di riavvio.
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) . Imposta il ritardo (in millisecondi) che il sistema attende prima di rieseguire un lavoro dopo che ha avuto esito negativo. Dopo che tutte le attività sono state arrestate con successo sui TaskManager, il ritardo viene calcolato e una volta trascorso il ritardo, le attività vengono riavviate.Questa configurazione è stata obsoleta, utilizza invece la strategia di riavvio.
  • getExecutionMode() / setExecutionMode() . La modalità di esecuzione predefinita è PIPELINED. Imposta la modalità di esecuzione per eseguire il programma. La modalità di esecuzione definisce se lo scambio di dati viene eseguito in modalità batch o streaming.
  • enableForceKryo() / disableForceKryo . Kryo non è obbligato ad essere utilizzato per impostazione predefinita. Forza GenericTypeInformation a utilizzare il serializzatore Kryo per i POJO anche se possono essere analizzati come POJO. In alcuni casi, è preferibile abilitare questa configurazione. Ad esempio, quando il serializzatore interno di Flink non riesce a gestire correttamente i POJO.
  • enableForceAvro() / disableForceAvro() . Avro non è obbligato a essere utilizzato per impostazione predefinita. Forza Flink AvroTypeInfo a utilizzare il serializzatore Avro invece di Kryo per serializzare i POJO Avro.
  • enableObjectReuse() / disableObjectReuse() . Per impostazione predefinita, gli oggetti non vengono riutilizzati in Flink. L'abilitazione della modalità di riutilizzo degli oggetti indica al runtime di riutilizzare gli oggetti utente per prestazioni migliori. Tieni presente che ciò potrebbe causare bug.
  • getGlobalJobParameters() / setGlobalJobParameters() . Questo metodo consente all'utente di impostare un oggetto personalizzato come configurazione globale del lavoro.PerchéExecutionConfig Accessibile in tutte le funzioni definite dall'utente, quindi questo è un modo semplice per rendere le configurazioni disponibili a livello globale tra i lavori.
  • addDefaultKryoSerializer(Class type, Serializer serializer) . Registra un'istanza del serializzatore Kryo per il tipo specificato.
  • addDefaultKryoSerializer(Class type, Class